Source code for dire_rapids.metrics

# metrics.py

"""
Performance metrics for dimensionality reduction evaluation.

This module provides GPU-accelerated metrics using RAPIDS cuML for evaluating
the quality of dimensionality reduction embeddings, including:

- Distortion metrics (stress)
- Context preservation metrics (SVM, kNN classification)
- Topological metrics (persistence homology, Betti curves)

The module supports multiple backends for persistence computation:
- giotto-ph (fastest CPU, multi-threaded)
- ripser++ (GPU-accelerated)
- ripser (CPU fallback)
"""

import warnings
import numpy as np

# CuPy for GPU arrays
try:
    import cupy as cp
    HAS_CUPY = True
except ImportError:
    HAS_CUPY = False
    warnings.warn("CuPy not available. GPU acceleration disabled.", UserWarning)

# sklearn imports (always needed for CPU fallback paths)
from sklearn.neighbors import NearestNeighbors
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.svm import LinearSVC
from sklearn.neighbors import KNeighborsClassifier

# RAPIDS cuML imports
try:
    from cuml.neighbors import NearestNeighbors as cumlNearestNeighbors
    from cuml.model_selection import train_test_split as cuml_train_test_split
    from cuml.preprocessing import StandardScaler as cumlStandardScaler
    from cuml.svm import SVC as cumlSVC
    from cuml.neighbors import KNeighborsClassifier as cumlKNeighborsClassifier
    HAS_CUML = True
except ImportError:
    HAS_CUML = False
    warnings.warn(
        "cuML not available. Falling back to CPU-based scikit-learn. "
        "Install RAPIDS for GPU acceleration.",
        UserWarning
    )

# Additional dependencies for persistence
try:
    from fastdtw import fastdtw
    HAS_DTW = True
except ImportError:
    HAS_DTW = False
    warnings.warn("fastdtw not available. DTW distance will not be available.", UserWarning)



#
# Auxiliary functions
#


[docs] def welford_update_gpu(count, mean, M2, new_value, finite_threshold=1e12): """ GPU-accelerated Welford's algorithm update step. Parameters ---------- count : cupy.ndarray Running count of valid values mean : cupy.ndarray Running mean M2 : cupy.ndarray Running sum of squared differences new_value : cupy.ndarray New values to incorporate finite_threshold : float Maximum magnitude for inclusion Returns ------- tuple : Updated (count, mean, M2) """ is_finite = cp.isfinite(new_value) & (cp.abs(new_value) < finite_threshold) count = count + is_finite delta = new_value - mean mean = mean + cp.where(is_finite, delta / cp.maximum(count, 1), 0) delta2 = new_value - mean M2 = M2 + cp.where(is_finite, delta * delta2, 0) return count, mean, M2
[docs] def welford_finalize_gpu(count, mean, M2): """ Finalize Welford's algorithm to compute mean and std. Parameters ---------- count : cupy.ndarray Total count of valid values mean : cupy.ndarray Computed mean M2 : cupy.ndarray Sum of squared differences Returns ------- tuple : (mean, std) """ variance = cp.where(count > 1, M2 / (count - 1), 0.0) return mean, cp.sqrt(variance)
[docs] def welford_gpu(data): """ GPU-accelerated computation of mean and std. Parameters ---------- data : cupy.ndarray or numpy.ndarray Input data Returns ------- tuple : (mean, std) """ if not HAS_CUPY: # Fallback to NumPy data_flat = np.asarray(data).ravel() mean = np.nanmean(data_flat) std = np.nanstd(data_flat) return float(mean), float(std) if isinstance(data, np.ndarray): data = cp.asarray(data) mean = float(cp.nanmean(data)) std = float(cp.nanstd(data)) return mean, std
[docs] def threshold_subsample_gpu(data, layout, labels=None, threshold=0.5, random_state=42): """ GPU-accelerated Bernoulli subsampling of data. Parameters ---------- data : array-like High-dimensional data layout : array-like Low-dimensional embedding labels : array-like, optional Data labels threshold : float Probability of keeping each sample (must be between 0.0 and 1.0) random_state : int Random seed Returns ------- tuple : Subsampled arrays Raises ------ ValueError If threshold is not between 0.0 and 1.0 """ if not 0.0 <= threshold <= 1.0: raise ValueError(f"subsample_threshold must be between 0.0 and 1.0, got {threshold}") if HAS_CUPY: cp.random.seed(random_state) # Convert to CuPy if needed data_gpu = cp.asarray(data) if not isinstance(data, cp.ndarray) else data layout_gpu = cp.asarray(layout) if not isinstance(layout, cp.ndarray) else layout n_samples = data_gpu.shape[0] random_numbers = cp.random.uniform(0, 1, size=n_samples) selected_indices = random_numbers < threshold data_sub = data_gpu[selected_indices] layout_sub = layout_gpu[selected_indices] if labels is not None: labels_gpu = cp.asarray(labels) if not isinstance(labels, cp.ndarray) else labels labels_sub = labels_gpu[selected_indices] return data_sub, layout_sub, labels_sub return data_sub, layout_sub # CPU fallback np.random.seed(random_state) data_np = np.asarray(data) layout_np = np.asarray(layout) n_samples = data_np.shape[0] random_numbers = np.random.uniform(0, 1, size=n_samples) selected_indices = random_numbers < threshold data_sub = data_np[selected_indices] layout_sub = layout_np[selected_indices] if labels is not None: labels_np = np.asarray(labels) labels_sub = labels_np[selected_indices] return data_sub, layout_sub, labels_sub return data_sub, layout_sub
# # kNN graph construction #
[docs] def make_knn_graph_gpu(data, n_neighbors, batch_size=50000): """ GPU-accelerated kNN graph construction using cuML. Parameters ---------- data : array-like Data points (n_samples, n_features) n_neighbors : int Number of nearest neighbors batch_size : int, optional Batch size for querying neighbors (queries in batches against full dataset). Default 50000 balances GPU memory and performance. Set to None to query all at once. Returns ------- tuple : (distances, indices) arrays of shape (n_samples, n_neighbors+1) """ if not HAS_CUML: # Fallback to CPU return make_knn_graph_cpu(data, n_neighbors, batch_size) # Convert to CuPy if needed if HAS_CUPY: data_gpu = cp.asarray(data, dtype=cp.float32) else: data_gpu = np.asarray(data, dtype=np.float32) n_samples = data_gpu.shape[0] # Fit on entire dataset nn = cumlNearestNeighbors(n_neighbors=n_neighbors + 1, metric='euclidean') nn.fit(data_gpu) if batch_size is None or batch_size >= n_samples: # Query all at once distances, indices = nn.kneighbors(data_gpu) else: # Query in batches (each batch queries against full dataset for memory efficiency) if HAS_CUPY: distances = cp.zeros((n_samples, n_neighbors + 1), dtype=cp.float32) indices = cp.zeros((n_samples, n_neighbors + 1), dtype=cp.int32) else: distances = np.zeros((n_samples, n_neighbors + 1), dtype=np.float32) indices = np.zeros((n_samples, n_neighbors + 1), dtype=np.int32) for start_idx in range(0, n_samples, batch_size): end_idx = min(start_idx + batch_size, n_samples) # Query batch against FULL dataset batch_distances, batch_indices = nn.kneighbors(data_gpu[start_idx:end_idx]) distances[start_idx:end_idx] = batch_distances indices[start_idx:end_idx] = batch_indices # Convert to CuPy arrays if not already if HAS_CUPY: distances = cp.asarray(distances) indices = cp.asarray(indices) return distances, indices
[docs] def make_knn_graph_cpu(data, n_neighbors, batch_size=10000): """ CPU fallback for kNN graph construction. Parameters ---------- data : array-like Data points n_neighbors : int Number of nearest neighbors batch_size : int, optional Batch size for querying neighbors (queries in batches against full dataset). Default 10000 provides good balance between memory and performance. Set to None to query all at once. Returns ------- tuple : (distances, indices) arrays """ data_np = np.asarray(data, dtype=np.float32) n_samples = data_np.shape[0] # Fit on entire dataset nn = NearestNeighbors(n_neighbors=n_neighbors + 1, metric='euclidean') nn.fit(data_np) if batch_size is None or batch_size >= n_samples: # Query all at once distances, indices = nn.kneighbors(data_np) else: # Query in batches (each batch queries against full dataset for memory efficiency) distances = np.zeros((n_samples, n_neighbors + 1), dtype=np.float32) indices = np.zeros((n_samples, n_neighbors + 1), dtype=np.int32) for start_idx in range(0, n_samples, batch_size): end_idx = min(start_idx + batch_size, n_samples) # Query batch against FULL dataset batch_distances, batch_indices = nn.kneighbors(data_np[start_idx:end_idx]) distances[start_idx:end_idx] = batch_distances indices[start_idx:end_idx] = batch_indices return distances, indices
# # Distortion metrics (stress) #
[docs] def compute_stress(data, layout, n_neighbors, eps=1e-6, use_gpu=True): """ Compute normalized stress (distortion) of an embedding. This metric measures how well distances are preserved between the high-dimensional data and low-dimensional layout. Parameters ---------- data : array-like High-dimensional data (n_samples, n_features) layout : array-like Low-dimensional embedding (n_samples, n_components) n_neighbors : int Number of nearest neighbors to consider eps : float Small constant to prevent division by zero use_gpu : bool Whether to use GPU acceleration Returns ------- float : Normalized stress value """ if use_gpu and HAS_CUML and HAS_CUPY: # GPU version data_gpu = cp.asarray(data, dtype=cp.float32) layout_gpu = cp.asarray(layout, dtype=cp.float32) # Compute kNN graph in high-dimensional space distances, indices = make_knn_graph_gpu(data_gpu, n_neighbors) # Distances are already L2 (Euclidean) from cuML distances = distances[:, 1:] # Remove self indices = indices[:, 1:] # Compute distances in low-dimensional space (vectorized) n_samples = layout_gpu.shape[0] neighbor_coords = layout_gpu[indices] # (n_samples, k, n_components) point_coords = layout_gpu[:, None, :] # (n_samples, 1, n_components) distances_emb = cp.linalg.norm(neighbor_coords - point_coords, axis=2) # Compute normalized stress ratios = cp.abs(distances / cp.maximum(distances_emb, eps) - 1.0) stress_mean = float(cp.mean(ratios)) stress_std = float(cp.std(ratios)) stress_normalized = 0.0 if stress_mean < eps else stress_std / stress_mean else: # CPU version data_np = np.asarray(data, dtype=np.float32) layout_np = np.asarray(layout, dtype=np.float32) distances, indices = make_knn_graph_cpu(data_np, n_neighbors) distances = distances[:, 1:] indices = indices[:, 1:] # Compute distances in embedding (vectorized) n_samples = layout_np.shape[0] neighbor_coords = layout_np[indices] # (n_samples, k, n_components) point_coords = layout_np[:, None, :] # (n_samples, 1, n_components) distances_emb = np.linalg.norm(neighbor_coords - point_coords, axis=2) ratios = np.abs(distances / np.maximum(distances_emb, eps) - 1.0) stress_mean = float(np.mean(ratios)) stress_std = float(np.std(ratios)) stress_normalized = 0.0 if stress_mean < eps else stress_std / stress_mean return stress_normalized
[docs] def compute_neighbor_score(data, layout, n_neighbors, use_gpu=True): """ Compute neighborhood preservation score. Measures how well k-nearest neighbor relationships are preserved from high-dimensional to low-dimensional space. Parameters ---------- data : array-like High-dimensional data layout : array-like Low-dimensional embedding n_neighbors : int Number of neighbors to consider use_gpu : bool Whether to use GPU acceleration Returns ------- list : [mean_score, std_score] """ if use_gpu and HAS_CUML and HAS_CUPY: data_gpu = cp.asarray(data, dtype=cp.float32) layout_gpu = cp.asarray(layout, dtype=cp.float32) _, indices_data = make_knn_graph_gpu(data_gpu, n_neighbors) _, indices_layout = make_knn_graph_gpu(layout_gpu, n_neighbors) indices_data = indices_data[:, 1:] # Remove self indices_layout = indices_layout[:, 1:] # Vectorized set intersection via broadcasting: # indices_data[:, :, None] == indices_layout[:, None, :] gives (N, k, k) bool # any(axis=2) collapses to (N, k), sum(axis=1) counts matches per row. indices_data_np = cp.asnumpy(indices_data) indices_layout_np = cp.asnumpy(indices_layout) k = indices_data_np.shape[1] matches = (indices_data_np[:, :, None] == indices_layout_np[:, None, :]).any(axis=2) preservation_scores = matches.sum(axis=1).astype(np.float32) / k neighbor_mean = float(np.mean(preservation_scores)) neighbor_std = float(np.std(preservation_scores)) else: data_np = np.asarray(data, dtype=np.float32) layout_np = np.asarray(layout, dtype=np.float32) _, indices_data = make_knn_graph_cpu(data_np, n_neighbors) _, indices_layout = make_knn_graph_cpu(layout_np, n_neighbors) indices_data = indices_data[:, 1:] indices_layout = indices_layout[:, 1:] # Vectorized set intersection via broadcasting: # indices_data[:, :, None] == indices_layout[:, None, :] gives (N, k, k) bool # any(axis=2) collapses to (N, k), sum(axis=1) counts matches per row. k = indices_data.shape[1] matches = (indices_data[:, :, None] == indices_layout[:, None, :]).any(axis=2) preservation_scores = matches.sum(axis=1).astype(np.float32) / k neighbor_mean = float(np.mean(preservation_scores)) neighbor_std = float(np.std(preservation_scores)) return [neighbor_mean, neighbor_std]
[docs] def compute_local_metrics(data, layout, n_neighbors, subsample_threshold=1.0, random_state=42, use_gpu=True): """ Compute local quality metrics (stress and neighborhood preservation). Parameters ---------- data : array-like High-dimensional data layout : array-like Low-dimensional embedding n_neighbors : int Number of neighbors for kNN graph subsample_threshold : float Subsampling probability (must be between 0.0 and 1.0, default 1.0 = no subsampling) random_state : int Random seed for subsampling use_gpu : bool Whether to use GPU acceleration Returns ------- dict : Dictionary containing 'stress' and 'neighbor' metrics Raises ------ ValueError If subsample_threshold is not between 0.0 and 1.0 """ if not 0.0 <= subsample_threshold <= 1.0: raise ValueError(f"subsample_threshold must be between 0.0 and 1.0, got {subsample_threshold}") # Apply subsampling if threshold < 1.0 if subsample_threshold < 1.0: data_sub, layout_sub = threshold_subsample_gpu( data, layout, threshold=subsample_threshold, random_state=random_state ) # Convert back to numpy if needed if HAS_CUPY and isinstance(data_sub, cp.ndarray): data_sub = cp.asnumpy(data_sub) layout_sub = cp.asnumpy(layout_sub) metrics = { 'stress': compute_stress(data_sub, layout_sub, n_neighbors, use_gpu=use_gpu), 'neighbor': compute_neighbor_score(data_sub, layout_sub, n_neighbors, use_gpu=use_gpu), 'n_samples': len(data_sub) } else: metrics = { 'stress': compute_stress(data, layout, n_neighbors, use_gpu=use_gpu), 'neighbor': compute_neighbor_score(data, layout, n_neighbors, use_gpu=use_gpu), 'n_samples': len(data) } return metrics
# # Context preservation metrics (SVM, kNN classification) #
[docs] def compute_svm_accuracy(X, y, test_size=0.3, reg_param=1.0, max_iter=1000, random_state=42, use_gpu=True): """ Compute SVM classification accuracy. Parameters ---------- X : array-like Features y : array-like Labels test_size : float Test set proportion reg_param : float Regularization parameter max_iter : int Maximum iterations random_state : int Random seed use_gpu : bool Whether to use cuML GPU acceleration Returns ------- float : Classification accuracy """ if use_gpu and HAS_CUML: if HAS_CUPY: X_gpu = cp.asarray(X, dtype=cp.float32) y_gpu = cp.asarray(y) else: X_gpu = np.asarray(X, dtype=np.float32) y_gpu = np.asarray(y) # Train/test split X_train, X_test, y_train, y_test = cuml_train_test_split( X_gpu, y_gpu, test_size=test_size, random_state=random_state ) # Standardize scaler = cumlStandardScaler() X_train = scaler.fit_transform(X_train) X_test = scaler.transform(X_test) # Train SVM model = cumlSVC(C=reg_param, max_iter=max_iter, kernel='linear') model.fit(X_train, y_train) # Predict and compute accuracy y_pred = model.predict(X_test) if HAS_CUPY: accuracy = float(cp.mean(y_pred == y_test)) else: accuracy = float(np.mean(y_pred == y_test)) else: X_np = np.asarray(X, dtype=np.float32) y_np = np.asarray(y) X_train, X_test, y_train, y_test = train_test_split( X_np, y_np, test_size=test_size, random_state=random_state ) scaler = StandardScaler() X_train = scaler.fit_transform(X_train) X_test = scaler.transform(X_test) model = LinearSVC(C=reg_param, max_iter=max_iter) model.fit(X_train, y_train) y_pred = model.predict(X_test) from sklearn.metrics import accuracy_score # pylint: disable=import-outside-toplevel accuracy = accuracy_score(y_test, y_pred) return accuracy
[docs] def compute_knn_accuracy(X, y, n_neighbors=16, test_size=0.3, random_state=42, use_gpu=True): """ Compute kNN classification accuracy. Parameters ---------- X : array-like Features y : array-like Labels n_neighbors : int Number of neighbors test_size : float Test set proportion random_state : int Random seed use_gpu : bool Whether to use cuML GPU acceleration Returns ------- float : Classification accuracy """ if use_gpu and HAS_CUML: if HAS_CUPY: X_gpu = cp.asarray(X, dtype=cp.float32) y_gpu = cp.asarray(y) else: X_gpu = np.asarray(X, dtype=np.float32) y_gpu = np.asarray(y) X_train, X_test, y_train, y_test = cuml_train_test_split( X_gpu, y_gpu, test_size=test_size, random_state=random_state ) scaler = cumlStandardScaler() X_train = scaler.fit_transform(X_train) X_test = scaler.transform(X_test) model = cumlKNeighborsClassifier(n_neighbors=n_neighbors) model.fit(X_train, y_train) y_pred = model.predict(X_test) if HAS_CUPY: accuracy = float(cp.mean(y_pred == y_test)) else: accuracy = float(np.mean(y_pred == y_test)) else: X_np = np.asarray(X, dtype=np.float32) y_np = np.asarray(y) X_train, X_test, y_train, y_test = train_test_split( X_np, y_np, test_size=test_size, random_state=random_state ) scaler = StandardScaler() X_train = scaler.fit_transform(X_train) X_test = scaler.transform(X_test) model = KNeighborsClassifier(n_neighbors=n_neighbors) model.fit(X_train, y_train) y_pred = model.predict(X_test) from sklearn.metrics import accuracy_score # pylint: disable=import-outside-toplevel accuracy = accuracy_score(y_test, y_pred) return accuracy
[docs] def compute_svm_score(data, layout, labels, subsample_threshold=0.5, random_state=42, use_gpu=True, **kwargs): """ Compute SVM context preservation score. Compares SVM classification accuracy on high-dimensional data vs low-dimensional embedding. Parameters ---------- data : array-like High-dimensional data layout : array-like Low-dimensional embedding labels : array-like Class labels subsample_threshold : float Subsampling probability (must be between 0.0 and 1.0) random_state : int Random seed use_gpu : bool Whether to use GPU acceleration **kwargs : dict Additional parameters for SVM Returns ------- ndarray : [acc_hd, acc_ld, log_ratio] Raises ------ ValueError If subsample_threshold is not between 0.0 and 1.0 """ if not 0.0 <= subsample_threshold <= 1.0: raise ValueError(f"subsample_threshold must be between 0.0 and 1.0, got {subsample_threshold}") # Subsample X_hd, X_ld, y = threshold_subsample_gpu( # pylint: disable=unbalanced-tuple-unpacking data, layout, labels, threshold=subsample_threshold, random_state=random_state ) # Convert back to NumPy for sklearn/cuml if HAS_CUPY and isinstance(X_hd, cp.ndarray): X_hd = cp.asnumpy(X_hd) X_ld = cp.asnumpy(X_ld) y = cp.asnumpy(y) test_size = kwargs.pop('test_size', 0.3) reg_param = kwargs.pop('reg_param', 1.0) max_iter = kwargs.pop('max_iter', 1000) svm_acc_hd = compute_svm_accuracy( X_hd, y, test_size=test_size, reg_param=reg_param, max_iter=max_iter, random_state=random_state, use_gpu=use_gpu ) svm_acc_ld = compute_svm_accuracy( X_ld, y, test_size=test_size, reg_param=reg_param, max_iter=max_iter, random_state=random_state, use_gpu=use_gpu ) svm_score = np.log(np.minimum(svm_acc_hd / svm_acc_ld, svm_acc_ld / svm_acc_hd)) return np.array([svm_acc_hd, svm_acc_ld, svm_score], dtype=np.float32)
[docs] def compute_knn_score(data, layout, labels, n_neighbors=16, subsample_threshold=0.5, random_state=42, use_gpu=True, **kwargs): """ Compute kNN context preservation score. Compares kNN classification accuracy on high-dimensional data vs low-dimensional embedding. Parameters ---------- data : array-like High-dimensional data layout : array-like Low-dimensional embedding labels : array-like Class labels n_neighbors : int Number of neighbors for kNN subsample_threshold : float Subsampling probability (must be between 0.0 and 1.0) random_state : int Random seed use_gpu : bool Whether to use GPU acceleration **kwargs : dict Additional parameters Returns ------- ndarray : [acc_hd, acc_ld, log_ratio] Raises ------ ValueError If subsample_threshold is not between 0.0 and 1.0 """ if not 0.0 <= subsample_threshold <= 1.0: raise ValueError(f"subsample_threshold must be between 0.0 and 1.0, got {subsample_threshold}") # Subsample X_hd, X_ld, y = threshold_subsample_gpu( # pylint: disable=unbalanced-tuple-unpacking data, layout, labels, threshold=subsample_threshold, random_state=random_state ) # Convert back to NumPy for sklearn/cuml if HAS_CUPY and isinstance(X_hd, cp.ndarray): X_hd = cp.asnumpy(X_hd) X_ld = cp.asnumpy(X_ld) y = cp.asnumpy(y) test_size = kwargs.pop('test_size', 0.3) knn_acc_hd = compute_knn_accuracy( X_hd, y, n_neighbors=n_neighbors, test_size=test_size, random_state=random_state, use_gpu=use_gpu ) knn_acc_ld = compute_knn_accuracy( X_ld, y, n_neighbors=n_neighbors, test_size=test_size, random_state=random_state, use_gpu=use_gpu ) knn_score = np.log(knn_acc_ld / knn_acc_hd) return np.array([knn_acc_hd, knn_acc_ld, knn_score], dtype=np.float32)
[docs] def compute_context_measures(data, layout, labels, subsample_threshold=0.5, n_neighbors=16, random_state=42, use_gpu=True, **kwargs): """ Compute context preservation measures (SVM and kNN). Parameters ---------- data : array-like High-dimensional data layout : array-like Low-dimensional embedding labels : array-like Class labels subsample_threshold : float Subsampling probability (must be between 0.0 and 1.0) n_neighbors : int Number of neighbors for kNN random_state : int Random seed use_gpu : bool Whether to use GPU acceleration **kwargs : dict Additional parameters Returns ------- dict : Dictionary with 'svm' and 'knn' scores Raises ------ ValueError If subsample_threshold is not between 0.0 and 1.0 """ if not 0.0 <= subsample_threshold <= 1.0: raise ValueError(f"subsample_threshold must be between 0.0 and 1.0, got {subsample_threshold}") measures = { 'svm': compute_svm_score( data, layout, labels, subsample_threshold, random_state, use_gpu, **kwargs ), 'knn': compute_knn_score( data, layout, labels, n_neighbors, subsample_threshold, random_state, use_gpu, **kwargs ) } return measures
# # Persistence homology and Betti curves # # # Fast H0 and H1 computation using kNN-based sparse Rips filtration #
[docs] def compute_h0_h1_knn(data, k_neighbors=20, density_threshold=0.8, overlap_factor=1.5, use_gpu=True): """ Compute H0/H1 Betti numbers using local kNN atlas approach. Build dense local triangulations around each point, then merge consistently. This avoids the "holes" problem of global sparse kNN graphs. Automatically selects between GPU and CPU implementation based on availability and use_gpu parameter. Parameters ---------- data : array-like Point cloud data (n_samples, n_features) k_neighbors : int Size of local neighborhood (default 20, recommended 15-20 for noisy data) density_threshold : float Percentile threshold for edge inclusion (0-1). Lower = denser triangulation. Default 0.8 means edges up to 80th percentile of local distances are included. overlap_factor : float Factor for expanding local neighborhoods to ensure overlap (default 1.5). Higher values create more dense, overlapping patches. use_gpu : bool Whether to use GPU acceleration (if available) Returns ------- tuple : (beta_0, beta_1) Betti numbers: β₀ (connected components), β₁ (loops) """ # Use compute_betti_curve with n_steps=1 to get Betti numbers at full complex from .betti_curve import compute_betti_curve # pylint: disable=import-outside-toplevel result = compute_betti_curve( data, k_neighbors=k_neighbors, density_threshold=density_threshold, overlap_factor=overlap_factor, n_steps=1, use_gpu=use_gpu ) # Extract Betti numbers from the result beta_0 = int(result['beta_0'][0]) beta_1 = int(result['beta_1'][0]) return beta_0, beta_1
[docs] def compute_global_metrics(data, layout, subsample_threshold=0.5, random_state=42, n_steps=100, k_neighbors=20, density_threshold=0.8, overlap_factor=1.5, use_gpu=False, metrics_only=True): """ Compute global topological metrics based on Betti curve comparison. Computes Betti curves for high-dimensional data and low-dimensional embedding using the atlas approach, then compares them using fastDTW distance. Parameters ---------- data : array-like High-dimensional data layout : array-like Low-dimensional embedding subsample_threshold : float Subsampling probability (must be between 0.0 and 1.0) random_state : int Random seed n_steps : int Number of points for Betti curves k_neighbors : int Size of local neighborhood for atlas approach (default 20) density_threshold : float Percentile threshold for edge inclusion (0-1, default 0.8) overlap_factor : float Factor for expanding local neighborhoods (default 1.5) use_gpu : bool Whether to use GPU acceleration metrics_only : bool If True, return only metrics; otherwise include betti curves Returns ------- dict : Dictionary containing DTW distances for β₀ and β₁ curves Raises ------ ValueError If subsample_threshold is not between 0.0 and 1.0 If fastdtw is not available """ if not 0.0 <= subsample_threshold <= 1.0: raise ValueError(f"subsample_threshold must be between 0.0 and 1.0, got {subsample_threshold}") if not HAS_DTW: raise ValueError("fastdtw not available. Install it with: pip install fastdtw") from .betti_curve import compute_betti_curve # pylint: disable=import-outside-toplevel # Subsample data data_sub, layout_sub = threshold_subsample_gpu( data, layout, threshold=subsample_threshold, random_state=random_state ) # Convert to NumPy if needed if HAS_CUPY and isinstance(data_sub, cp.ndarray): data_np = cp.asnumpy(data_sub) layout_np = cp.asnumpy(layout_sub) else: data_np = np.asarray(data_sub, dtype=np.float32) layout_np = np.asarray(layout_sub, dtype=np.float32) # Compute Betti curves for high-dimensional data result_hd = compute_betti_curve( data_np, k_neighbors=k_neighbors, density_threshold=density_threshold, overlap_factor=overlap_factor, n_steps=n_steps, use_gpu=use_gpu ) # Compute Betti curves for low-dimensional embedding result_ld = compute_betti_curve( layout_np, k_neighbors=k_neighbors, density_threshold=density_threshold, overlap_factor=overlap_factor, n_steps=n_steps, use_gpu=use_gpu ) # Extract Betti curves filtration_hd = result_hd['filtration_values'] beta_0_hd = result_hd['beta_0'] beta_1_hd = result_hd['beta_1'] filtration_ld = result_ld['filtration_values'] beta_0_ld = result_ld['beta_0'] beta_1_ld = result_ld['beta_1'] # Compute DTW distances between Betti curves # For β₀ curves seq0_beta0 = np.column_stack([filtration_hd, beta_0_hd]) seq1_beta0 = np.column_stack([filtration_ld, beta_0_ld]) dtw_beta0, _ = fastdtw(seq0_beta0, seq1_beta0, dist=2) # For β₁ curves seq0_beta1 = np.column_stack([filtration_hd, beta_1_hd]) seq1_beta1 = np.column_stack([filtration_ld, beta_1_ld]) dtw_beta1, _ = fastdtw(seq0_beta1, seq1_beta1, dist=2) # Normalize by number of points n_points = len(data_np) dtw_beta0 /= n_points dtw_beta1 /= n_points metrics = { 'dtw_beta0': float(dtw_beta0), 'dtw_beta1': float(dtw_beta1) } if metrics_only: return {'metrics': metrics, 'backend': 'atlas'} # Include Betti curves if requested betti_curves = { 'data': { 'filtration': filtration_hd, 'beta_0': beta_0_hd, 'beta_1': beta_1_hd }, 'layout': { 'filtration': filtration_ld, 'beta_0': beta_0_ld, 'beta_1': beta_1_ld } } return {'metrics': metrics, 'bettis': betti_curves, 'backend': 'atlas'}
# # Convenience function for all metrics #
[docs] def evaluate_embedding(data, layout, labels=None, n_neighbors=16, subsample_threshold=0.5, random_state=42, use_gpu=True, compute_distortion=True, compute_context=True, compute_topology=True, **kwargs): """ Comprehensive evaluation of a dimensionality reduction embedding. Computes distortion, context preservation, and topological metrics. Parameters ---------- data : array-like High-dimensional data (n_samples, n_features) layout : array-like Low-dimensional embedding (n_samples, n_components) labels : array-like, optional Class labels for context metrics n_neighbors : int Number of neighbors for kNN metrics subsample_threshold : float Subsampling probability for all metrics (must be between 0.0 and 1.0, default 0.5) random_state : int Random seed use_gpu : bool Whether to use GPU acceleration compute_distortion : bool Whether to compute distortion metrics (default True) compute_context : bool Whether to compute context metrics (default True) compute_topology : bool Whether to compute topological metrics (default True) **kwargs : dict Additional parameters for specific metrics Returns ------- dict : Dictionary with all computed metrics Raises ------ ValueError If subsample_threshold is not between 0.0 and 1.0 """ if not 0.0 <= subsample_threshold <= 1.0: raise ValueError(f"subsample_threshold must be between 0.0 and 1.0, got {subsample_threshold}") results = {} # Local metrics (distortion) if compute_distortion: print("Computing local metrics (stress, neighbor preservation)...") results['local'] = compute_local_metrics( data, layout, n_neighbors, subsample_threshold=subsample_threshold, random_state=random_state, use_gpu=use_gpu ) # Context metrics (if labels provided) if compute_context and labels is not None: print("Computing context preservation metrics (SVM, kNN)...") results['context'] = compute_context_measures( data, layout, labels, subsample_threshold, n_neighbors, random_state, use_gpu, **kwargs ) # Global topological metrics using Betti curve comparison if compute_topology: print("Computing topological metrics using Betti curve comparison...") results['topology'] = compute_global_metrics( data, layout, subsample_threshold, random_state, n_steps=100, use_gpu=use_gpu ) return results