Source code for dire_rapids.dire_cuvs

# dire_cuvs.py

"""
DIRE with cuVS backend for GPU-accelerated k-NN at scale.

This module provides optional cuVS integration for massive datasets.
Falls back to PyTorch if cuVS is not available.

Requirements:
    Follow the installation instructions at https://docs.rapids.ai/install/
"""

import numpy as np
import torch  # pylint: disable=unused-import # Used via parent class (self.device from DiRePyTorch)
from loguru import logger

# Import base DIRE PyTorch implementation
from .dire_pytorch import (  # pylint: disable=cyclic-import
    DiRePyTorch,
    _is_cuvs_native_metric,
    _prefer_memory_efficient_for_shape,
    _remove_self_from_knn,
)

# Try to import cuVS and CuPy
try:
    import cupy as cp
    from cuvs.neighbors import cagra, ivf_pq, ivf_flat
    CUVS_AVAILABLE = True
    logger.trace("cuVS available - GPU-accelerated k-NN enabled")
except ImportError:
    CUVS_AVAILABLE = False
    logger.trace("cuVS not available. Install RAPIDS for GPU-accelerated k-NN: "
                 "Follow the installation instructions at https://docs.rapids.ai/install/")

# Try to import cuML for GPU-accelerated PCA
try:
    from cuml.decomposition import PCA as cuPCA
    from cuml.decomposition import TruncatedSVD as cuTruncatedSVD
    CUML_AVAILABLE = True
    logger.trace("cuML available - GPU-accelerated PCA enabled")
except ImportError:
    CUML_AVAILABLE = False
    if CUVS_AVAILABLE:
        logger.trace("cuML not available but cuVS is. PCA will run on CPU.")


[docs] class DiReCuVS(DiRePyTorch): """ RAPIDS cuVS/cuML accelerated implementation of DiRe for massive datasets. This class extends DiRePyTorch with optional RAPIDS cuVS (CUDA Vector Search) integration for GPU-accelerated k-nearest neighbors computation and cuML integration for GPU-accelerated PCA initialization. It provides substantial performance improvements for large-scale datasets. Performance Advantages over PyTorch/PyKeOps ------------------------------------------- - **10-100x faster k-NN**: For large datasets (>100K points) - **Massive scale support**: Handles 10M+ points efficiently - **High accuracy**: Approximate k-NN with >95% recall - **Multi-GPU ready**: Supports extreme scale processing - **GPU-accelerated PCA**: cuML PCA/SVD for initialization Automatic Fallback ------------------ With ``knn_backend='auto'``, falls back to PyTorch backend if cuVS is not available or is not beneficial for the current data. With ``knn_backend='cuvs'``, cuVS is required and unavailable/unsupported configurations raise instead of falling back. Parameters ---------- use_cuvs : bool or None, default=None Whether to use cuVS for k-NN computation. If None, automatically detected based on availability and hardware. use_cuml : bool or None, default=None Whether to use cuML for PCA initialization. If None, automatically detected based on availability and hardware. cuvs_index_type : {'auto', 'ivf_flat', 'ivf_pq', 'cagra', 'flat'}, default='auto' Type of cuVS index to build: - 'auto': Automatically select based on data characteristics - 'ivf_flat': Inverted file index without compression - 'ivf_pq': Inverted file index with product quantization - 'cagra': Graph-based index for very large datasets - 'flat': Brute-force exact search cuvs_build_params : dict, optional Custom parameters for cuVS index building. Overrides defaults. cuvs_search_params : dict, optional Custom parameters for cuVS search. Overrides defaults. *args, **kwargs Additional arguments passed to DiRePyTorch parent class. Includes: n_components, n_neighbors, init, max_iter_layout, min_dist, spread, cutoff, neg_ratio, verbose, random_state, use_exact_repulsion, metric (custom distance function for k-NN computation), knn_backend. Attributes ---------- use_cuvs : bool Whether cuVS backend is enabled and available. use_cuml : bool Whether cuML backend is enabled and available. cuvs_index : object or None Built cuVS index for k-NN search. Examples -------- Basic usage with automatic backend selection:: from dire_rapids import DiReCuVS import numpy as np # Large dataset X = np.random.randn(100000, 512) # Auto-detect cuVS/cuML availability reducer = DiReCuVS() embedding = reducer.fit_transform(X) Force cuVS with custom index parameters:: reducer = DiReCuVS( use_cuvs=True, cuvs_index_type='ivf_pq', cuvs_build_params={'n_lists': 2048, 'pq_dim': 64} ) Massive dataset processing:: # 10M points, 1000 dimensions X = np.random.randn(10_000_000, 1000) reducer = DiReCuVS( use_cuvs=True, use_cuml=True, cuvs_index_type='cagra', # Best for very large datasets n_neighbors=32 ) embedding = reducer.fit_transform(X) With custom distance metric:: # cuVS with L1 distance for k-NN computation reducer = DiReCuVS( use_cuvs=True, metric='(x - y).abs().sum(-1)', # L1/Manhattan distance n_neighbors=32, cuvs_index_type='ivf_flat' ) embedding = reducer.fit_transform(X) Notes ----- **Requirements:** - RAPIDS cuVS: Follow the installation instructions at https://docs.rapids.ai/install/ - CUDA-capable GPU with compute capability >= 6.0 **Index Selection Guidelines:** - < 50K points: 'flat' (exact search) - 50K-500K points: 'ivf_flat' - 500K-5M points: 'ivf_pq' - > 5M points: 'cagra' (if dimensions <= 500) **Memory Considerations:** - cuVS requires float32 precision (no FP16 support) - Index building requires additional GPU memory - 'cagra' uses more memory but provides best performance for huge datasets """
[docs] def __init__( self, *args, use_cuvs=None, # Auto-detect by default use_cuml=None, # Auto-detect by default cuvs_index_type='auto', # 'auto', 'ivf_flat', 'ivf_pq', 'cagra' cuvs_build_params=None, cuvs_search_params=None, **kwargs ): """ Initialize DiReCuVS with cuVS and cuML backend configuration. Parameters ---------- *args Positional arguments passed to DiRePyTorch parent class. use_cuvs : bool or None, default=None Whether to use cuVS for k-NN computation: - None: Auto-detect based on availability and GPU presence - True: Force cuVS usage (raises error if unavailable) - False: Disable cuVS, use PyTorch backend use_cuml : bool or None, default=None Whether to use cuML for PCA initialization: - None: Auto-detect based on availability and GPU presence - True: Force cuML usage (raises error if unavailable) - False: Disable cuML, use sklearn backend cuvs_index_type : {'auto', 'ivf_flat', 'ivf_pq', 'cagra', 'flat'}, default='auto' Type of cuVS index to build: - 'auto': Automatically select optimal index based on data size/dimensionality - 'ivf_flat': Inverted file index without compression (good balance) - 'ivf_pq': Inverted file with product quantization (memory efficient) - 'cagra': Graph-based index (best for very large datasets) - 'flat': Brute-force exact search (small datasets only) cuvs_build_params : dict, optional Custom parameters for cuVS index building. These override the automatically determined parameters. See cuVS documentation for index-specific parameters. cuvs_search_params : dict, optional Custom parameters for cuVS search operations. These override the automatically determined parameters. See cuVS documentation for index-specific search parameters. **kwargs Additional keyword arguments passed to DiRePyTorch parent class. See DiRePyTorch documentation for available parameters including: n_components, n_neighbors, init, max_iter_layout, min_dist, spread, cutoff, neg_ratio, verbose, random_state, use_exact_repulsion, metric (custom distance function for k-NN computation). Raises ------ ImportError If cuVS or cuML are requested but not available. RuntimeError If GPU is required but not available. """ super().__init__(*args, **kwargs) # Auto-detect cuVS usage if use_cuvs is None: # Use cuVS if available and we have a GPU self.use_cuvs = CUVS_AVAILABLE and self.device.type == 'cuda' else: self.use_cuvs = use_cuvs and CUVS_AVAILABLE and self.device.type == 'cuda' if self.use_cuvs: self.logger.info("cuVS backend enabled for k-NN computation") cp.random.seed(self.random_state) self.logger.debug(f"Seeded CuPy random generator: {self.random_state}") else: if use_cuvs and not CUVS_AVAILABLE: self.logger.warning("cuVS requested but not available, falling back to PyTorch") elif use_cuvs and self.device.type != 'cuda': self.logger.warning("cuVS requested but CUDA is not available, falling back to PyTorch") # Auto-detect cuML usage for PCA if use_cuml is None: self.use_cuml = CUML_AVAILABLE and self.device.type == 'cuda' else: self.use_cuml = use_cuml and CUML_AVAILABLE and self.device.type == 'cuda' if self.use_cuml: self.logger.info("cuML backend enabled for PCA initialization") else: if use_cuml and not CUML_AVAILABLE: self.logger.warning("cuML requested but not available, falling back to sklearn") elif use_cuml and self.device.type != 'cuda': self.logger.warning("cuML requested but CUDA is not available, falling back to sklearn") self.cuvs_index_type = cuvs_index_type self.cuvs_build_params = cuvs_build_params self.cuvs_search_params = cuvs_search_params self.cuvs_index = None self._n_lists = None # Store n_lists for search params (older cuVS versions don't expose index.n_lists)
def _select_cuvs_index_type(self, n_samples, n_dims, metric='sqeuclidean'): """ Automatically select optimal cuVS index type based on data characteristics. This private method uses heuristics to select the most appropriate cuVS index type based on dataset size, dimensionality, metric, and performance trade-offs. Parameters ---------- n_samples : int Number of samples in the dataset. n_dims : int Number of dimensions/features per sample. metric : str, default='sqeuclidean' Distance metric to use. CAGRA only supports 'sqeuclidean' and 'inner_product'. Returns ------- str Selected cuVS index type ('flat', 'ivf_flat', 'ivf_pq', or 'cagra'). Notes ----- Private method, should not be called directly. Used by _compute_knn(). Selection Heuristics: - **< 50K samples**: 'flat' (exact search) - **50K-500K samples or >500D**: 'ivf_flat' (good balance) - **500K-5M samples**: 'ivf_pq' (memory efficient) - **> 5M samples and ≤500D**: 'cagra' (best performance, if metric is supported) - **> 5M samples and >500D**: 'ivf_pq' (high-D fallback) - **cosine metric**: Forces IVF method (CAGRA doesn't support cosine) """ if self.cuvs_index_type != 'auto': return self.cuvs_index_type # Decision tree based on scale and dimensionality # For high dimensions (>500), prefer IVF methods over graph-based if n_samples < 50000: # Small dataset - use flat (IVF with many lists) return 'flat' if n_samples < 500000 or n_dims > 500: # Medium dataset or high-D - IVF without compression # IVF-Flat works better than CAGRA for high dimensions return 'ivf_flat' if n_samples < 5000000: # Large dataset - IVF with compression return 'ivf_pq' # Very large dataset with moderate dimensions # CAGRA is best for performance, but only if metric is supported if n_dims <= 500 and metric in ('sqeuclidean', 'euclidean', 'inner_product'): return 'cagra' else: # Fallback to IVF-PQ for unsupported metrics (e.g., cosine) or high-D return 'ivf_pq' def _build_cuvs_index(self, X_gpu, index_type, metric='euclidean'): """ Build cuVS index for fast k-NN search. This private method constructs the appropriate cuVS index based on the specified index type and data characteristics, with optimized parameters for each index variant. Parameters ---------- X_gpu : cupy.ndarray Input data on GPU, shape (n_samples, n_features), dtype float32. index_type : str Type of index to build ('flat', 'ivf_flat', 'ivf_pq', 'cagra'). metric : str, default='euclidean' Distance metric to use ('euclidean' or 'inner_product'). Returns ------- cuVS index object or None Built cuVS index ready for search operations. Returns None for 'flat' type (no index needed). Notes ----- Private method, should not be called directly. Used by _compute_knn(). Index-Specific Optimizations: - **IVF-Flat**: Adaptive n_lists based on dataset size and dimensionality - **IVF-PQ**: Optimized PQ dimension and quantization parameters - **CAGRA**: Graph-based parameters tuned for large datasets Raises ------ ValueError If unknown index_type is specified. """ n_samples, n_dims = X_gpu.shape self.logger.info(f"Building cuVS {index_type} index for {n_samples} points in {n_dims}D with {metric} metric...") if index_type == 'flat': # Exact search - no index needed self.logger.info("Using brute-force search (exact)") return None if index_type == 'ivf_flat': # IVF without compression # For high-D data, use more lists for better quantization if n_dims > 500: # High-D: more lists help with curse of dimensionality n_lists = min(int(np.sqrt(n_samples) * 2), 8192) else: n_lists = min(int(np.sqrt(n_samples)), 4096) build_params = ivf_flat.IndexParams( n_lists=n_lists, metric=metric, add_data_on_build=True ) if self.cuvs_build_params: build_params.update(self.cuvs_build_params) index = ivf_flat.build(build_params, X_gpu) self._n_lists = n_lists # Store for search params self.logger.info(f"Built IVF-Flat index with {n_lists} lists for {n_dims}D data") elif index_type == 'ivf_pq': # IVF with product quantization n_lists = min(int(np.sqrt(n_samples)), 8192) pq_dim = min(n_dims // 4, 128) # Reasonable PQ dimension build_params = ivf_pq.IndexParams( n_lists=n_lists, metric=metric, pq_dim=pq_dim, pq_bits=8, add_data_on_build=True ) if self.cuvs_build_params: build_params.update(self.cuvs_build_params) index = ivf_pq.build(build_params, X_gpu) self._n_lists = n_lists # Store for search params self.logger.info(f"Built IVF-PQ index with {n_lists} lists, PQ dim={pq_dim}") elif index_type == 'cagra': # CAGRA only supports sqeuclidean and inner_product metrics if metric == 'euclidean': cagra_metric = 'sqeuclidean' elif metric == 'sqeuclidean': cagra_metric = 'sqeuclidean' elif metric == 'inner_product': cagra_metric = 'inner_product' elif metric == 'cosine': raise ValueError( "CAGRA index does not support cosine metric. " "Use IVF-Flat or IVF-PQ index types for cosine distance, " "or normalize your data and use inner_product metric." ) else: raise ValueError( f"CAGRA index does not support metric '{metric}'. " f"Valid metrics: ['sqeuclidean', 'inner_product'] " f"(euclidean is automatically converted to sqeuclidean)." ) build_params = cagra.IndexParams( metric=cagra_metric, graph_degree=32, intermediate_graph_degree=64, build_algo='nn_descent' ) if self.cuvs_build_params: build_params.update(self.cuvs_build_params) index = cagra.build(build_params, X_gpu) self.logger.info(f"Built CAGRA graph-based index with {cagra_metric} metric") else: raise ValueError(f"Unknown index type: {index_type}") return index def _search_cuvs(self, index, index_type, X_gpu, k, metric='euclidean'): """ Search cuVS index for k nearest neighbors. This private method performs k-NN search using the built cuVS index, with optimized search parameters for each index type. Parameters ---------- index : cuVS index object or None Built cuVS index from _build_cuvs_index(). index_type : str Type of index being searched ('flat', 'ivf_flat', 'ivf_pq', 'cagra'). X_gpu : cupy.ndarray Query data on GPU, shape (n_samples, n_features), dtype float32. k : int Number of nearest neighbors to find (plus 1 for self). metric : str, default='euclidean' Distance metric to use ('euclidean' or 'inner_product'). Returns ------- tuple of cupy.ndarray distances : cupy.ndarray of shape (n_samples, k+1) Distances to k+1 nearest neighbors (including self). indices : cupy.ndarray of shape (n_samples, k+1) Indices of k+1 nearest neighbors (including self). Notes ----- Private method, should not be called directly. Used by _compute_knn(). Search Parameters: - **IVF methods**: Adaptive n_probes based on index size - **CAGRA**: Optimized search width and internal parameters - **Flat**: Uses IVF-Flat with high probe count for near-exact results Raises ------ ValueError If unknown index_type is specified. """ n_samples = X_gpu.shape[0] self.logger.info(f"Searching for {k} nearest neighbors using cuVS {index_type}...") if index_type == 'flat': # For flat/brute force, just use IVF-Flat with many lists for exact search # This avoids dtype issues with brute_force module n_lists = min(int(np.sqrt(n_samples)), 1024) build_params = ivf_flat.IndexParams( n_lists=n_lists, metric=metric, add_data_on_build=True ) index = ivf_flat.build(build_params, X_gpu) # Search with high probe count for near-exact results search_params = ivf_flat.SearchParams( n_probes=min(n_lists, 256) # High probe count for accuracy ) distances, indices = ivf_flat.search( search_params, index, X_gpu, k+1 ) elif index_type == 'ivf_flat': # IVF search - use stored n_lists (compatible with older cuVS versions) n_probes = min(self._n_lists // 10, 100) if self._n_lists else 20 search_params = ivf_flat.SearchParams(n_probes=n_probes) if self.cuvs_search_params: search_params.update(self.cuvs_search_params) distances, indices = ivf_flat.search( search_params, index, X_gpu, k+1 ) elif index_type == 'ivf_pq': # IVF-PQ search - use stored n_lists (compatible with older cuVS versions) n_probes = min(self._n_lists // 10, 200) if self._n_lists else 20 search_params = ivf_pq.SearchParams(n_probes=n_probes) if self.cuvs_search_params: search_params.update(self.cuvs_search_params) distances, indices = ivf_pq.search( search_params, index, X_gpu, k+1 ) elif index_type == 'cagra': # CAGRA search search_params = cagra.SearchParams( max_queries=0, # Automatic itopk_size=min(k * 2, 256), search_width=4 ) if self.cuvs_search_params: search_params.update(self.cuvs_search_params) distances, indices = cagra.search( search_params, index, X_gpu, k+1 ) else: raise ValueError(f"Unknown index type: {index_type}") return distances, indices def _native_cuvs_metric(self): """Return whether the configured metric can run in cuVS.""" return _is_cuvs_native_metric(self.metric_spec) def _cuvs_knn_unavailable_reason(self, n_samples, n_dims): """Return a reason cuVS k-NN cannot run for this reducer, or None.""" del n_samples if not CUVS_AVAILABLE: return "RAPIDS cuVS is not installed" if self.device.type != 'cuda': return "cuVS k-NN requires a CUDA device" if not self.use_cuvs: return "cuVS k-NN is disabled on this reducer" if n_dims > 2048: return f"cuVS k-NN supports up to 2048 dimensions in this path (got {n_dims})" if not self._native_cuvs_metric(): return "cuVS k-NN only supports named metrics, not custom expressions/callables" return None def _fallback_to_pytorch_knn(self, X, chunk_size=None, use_fp16=None, reason=None): """Fallback from cuVS to a PyTorch k-NN implementation.""" n_samples, n_dims = X.shape if ( self.knn_backend in ('auto', 'pytorch') and _prefer_memory_efficient_for_shape(n_samples, n_dims, self.metric_spec) ): from .dire_pytorch_memory_efficient import DiRePyTorchMemoryEfficient # pylint: disable=import-outside-toplevel if reason is not None: self.logger.info( "Using memory-efficient PyTorch fallback for k-NN " f"because cuVS is unavailable: {reason}" ) fallback = DiRePyTorchMemoryEfficient( n_components=self.n_components, n_neighbors=self.n_neighbors, init=self.init, max_iter_layout=self.max_iter_layout, min_dist=self.min_dist, spread=self.spread, cutoff=self.cutoff, n_sample_dirs=self.n_sample_dirs, sample_size=self.sample_size, neg_ratio=self.neg_ratio, verbose=self.verbose, random_state=self.random_state, use_exact_repulsion=self.use_exact_repulsion, metric=self.metric_spec, knn_backend='pytorch' if self.knn_backend == 'auto' else self.knn_backend, knn_chunk_size=chunk_size, memory_fraction=self.knn_memory_fraction, knn_memory_fraction=self.knn_memory_fraction, knn_broadcast_memory_multiplier=self.knn_broadcast_memory_multiplier, normalize=self.normalize, ) fallback.device = self.device fallback._compute_knn(X, chunk_size=chunk_size, use_fp16=use_fp16) self._knn_indices = fallback._knn_indices self._knn_distances = fallback._knn_distances self._last_knn_backend = fallback._last_knn_backend self._last_knn_chunk_size = fallback._last_knn_chunk_size self._last_knn_distance_strategy = fallback._last_knn_distance_strategy self._last_knn_reducer = type(fallback).__name__ return None if reason is not None: self.logger.info(f"Using PyTorch backend for k-NN ({reason})") return super()._compute_knn(X, chunk_size=chunk_size, use_fp16=use_fp16) def _compute_knn(self, X, chunk_size=None, use_fp16=None): """ Compute k-NN using cuVS acceleration when available and beneficial. This method overrides the parent implementation to use cuVS for k-NN computation when it provides performance benefits, automatically falling back to PyTorch for cases where cuVS isn't optimal. Parameters ---------- X : numpy.ndarray Input data of shape (n_samples, n_features). chunk_size : int, optional Chunk size for processing (used by fallback PyTorch method). If None, the fallback computes a memory-aware chunk size. use_fp16 : bool, optional Use FP16 precision (used by fallback PyTorch method). Note: cuVS requires float32, so FP16 is only used for PyTorch fallback. Notes ----- Private method, should not be called directly. Used by fit_transform(). cuVS Usage Criteria: - cuVS backend must be enabled and available - Dataset size >= 10,000 samples (cuVS overhead not worth it for smaller datasets) - Dimensionality <= 2,048 (cuVS works best for moderate dimensions) - Only native named metrics supported (euclidean/l2, sqeuclidean, inner_product, cosine) If criteria aren't met, falls back to parent PyTorch implementation. Side Effects ------------ Sets self._knn_indices and self._knn_distances with computed k-NN graph. Cleans up GPU memory after computation. """ n_samples, n_dims = X.shape if self.knn_backend in ('pytorch', 'pykeops'): return self._fallback_to_pytorch_knn( X, chunk_size=chunk_size, use_fp16=use_fp16, reason=f"knn_backend='{self.knn_backend}' was requested", ) # Check if custom metric expression/callable is specified. # cuVS only supports named metrics, not arbitrary tensor expressions. unavailable_reason = self._cuvs_knn_unavailable_reason(n_samples, n_dims) if self.knn_backend == 'cuvs': if unavailable_reason is not None: raise RuntimeError(f"knn_backend='cuvs' requested but unavailable: {unavailable_reason}") elif unavailable_reason is not None: if not self._native_cuvs_metric(): self.logger.warning( "Custom metric expressions/callables not supported by cuVS. " "Falling back to PyTorch backend for k-NN." ) else: self.logger.info(f"Using PyTorch backend for k-NN ({unavailable_reason})") return self._fallback_to_pytorch_knn( X, chunk_size=chunk_size, use_fp16=use_fp16, reason=unavailable_reason, ) # Decide whether to use cuVS use_cuvs_for_this = ( self.knn_backend == 'cuvs' or n_samples >= 10000 # cuVS overhead not worth it for small datasets ) if not use_cuvs_for_this: # Fall back to PyTorch implementation self.logger.info("Using PyTorch backend for k-NN") return self._fallback_to_pytorch_knn( X, chunk_size=chunk_size, use_fp16=use_fp16, reason="dataset is below the cuVS auto threshold", ) # Use cuVS for k-NN self._last_knn_backend = 'cuvs' self.logger.info(f"Computing {self.n_neighbors}-NN graph using cuVS...") # Determine which metric to use for cuVS # Default to sqeuclidean (cuVS default), but allow named metrics cuvs_metric = 'sqeuclidean' if self.metric_spec is not None and isinstance(self.metric_spec, str): metric_lower = self.metric_spec.strip().lower() if metric_lower in ('euclidean', 'l2'): cuvs_metric = 'euclidean' elif metric_lower == 'sqeuclidean': cuvs_metric = 'sqeuclidean' elif metric_lower == 'inner_product': cuvs_metric = 'inner_product' elif metric_lower == 'cosine': # Supported by IVF-Flat, IVF-PQ, and flat index types # Note: CAGRA does not support cosine, will be validated in _build_cuvs_index cuvs_metric = 'cosine' # Convert to CuPy array # Note: cuVS requires float32, not float16 # cuVS also requires C-contiguous (row-major) arrays X_gpu = cp.asarray(X, dtype=cp.float32, order='C') # Select index type (pass metric to avoid selecting CAGRA for unsupported metrics) index_type = self._select_cuvs_index_type(n_samples, n_dims, cuvs_metric) # Build index if index_type != 'flat': self.cuvs_index = self._build_cuvs_index(X_gpu, index_type, cuvs_metric) else: self.cuvs_index = None # Search for k-NN distances, indices = self._search_cuvs( self.cuvs_index, index_type, X_gpu, self.n_neighbors, cuvs_metric ) # Convert to NumPy, then remove self. Self is not guaranteed to be the # first column when there are duplicate/tied distances. indices_cp = cp.asarray(indices) distances_cp = cp.asarray(distances) self._knn_indices, self._knn_distances = _remove_self_from_knn( cp.asnumpy(indices_cp), cp.asnumpy(distances_cp), 0, self.n_neighbors, ) self.logger.info(f"k-NN graph computed: shape {self._knn_indices.shape}") # Clean up GPU memory del X_gpu if self.cuvs_index is not None: del self.cuvs_index self.cuvs_index = None cp.get_default_memory_pool().free_all_blocks() def _initialize_embedding(self, X): """ Initialize embedding using cuML PCA when available, with sklearn fallback. This method overrides the parent implementation to use GPU-accelerated cuML PCA/TruncatedSVD for initialization when available, providing significant speedups for high-dimensional data. Parameters ---------- X : numpy.ndarray Input high-dimensional data of shape (n_samples, n_features). Returns ------- torch.Tensor Initial embedding of shape (n_samples, n_components) on the target device. Notes ----- Private method, should not be called directly. Used by fit_transform(). cuML Usage: - Uses TruncatedSVD for high-dimensional data (>100 features) for efficiency - Uses regular PCA for lower-dimensional data - Performs normalization on GPU before converting to PyTorch - Uses DLPack for zero-copy GPU tensor transfer Falls back to parent sklearn-based initialization if: - cuML is not available or disabled - Initialization method is not 'pca' - Any errors occur during cuML processing """ if self.use_cuml and self.init == 'pca': self.logger.info("Initializing with cuML PCA (GPU-accelerated)") # Convert to CuPy array if needed if isinstance(X, np.ndarray): X_gpu = cp.asarray(X, dtype=cp.float32) else: X_gpu = X # Use TruncatedSVD for high-dimensional data (more efficient) if X.shape[1] > 100: # TruncatedSVD is perfect for high-D to low-D reduction pca = cuTruncatedSVD( n_components=self.n_components, random_state=self.random_state ) else: # Regular PCA for lower dimensions # Note: cuPCA doesn't support random_state parameter pca = cuPCA( n_components=self.n_components ) # Fit and transform on GPU embedding_gpu = pca.fit_transform(X_gpu) # Convert to PyTorch tensor on GPU # cuML returns cupy array, convert to torch embedding_cp = cp.asarray(embedding_gpu) # Normalize on GPU embedding_cp -= embedding_cp.mean(axis=0) embedding_cp /= embedding_cp.std(axis=0) # Use DLPack for zero-copy transfer from CuPy to PyTorch. # Modern CuPy exposes the Python DLPack protocol directly; # ``toDlpack()`` is deprecated. embedding_torch = torch.from_dlpack(embedding_cp) return embedding_torch.to(self.device) # Fall back to CPU sklearn PCA return super()._initialize_embedding(X)
[docs] def fit_transform(self, X, y=None): """ Fit the model and transform data with cuVS/cuML acceleration. This method extends the parent implementation with intelligent backend selection and logging to inform users about the acceleration being used. Parameters ---------- X : array-like of shape (n_samples, n_features) High-dimensional input data to transform. y : array-like of shape (n_samples,), optional Ignored. Present for scikit-learn API compatibility. Returns ------- numpy.ndarray of shape (n_samples, n_components) Low-dimensional embedding of the input data. Notes ----- Backend Selection Logic: - Uses cuVS for k-NN if dataset is large enough and cuVS is available - Uses cuML for PCA initialization if available and init='pca' - Falls back to PyTorch implementations automatically Performance Benefits: - cuVS k-NN: 10-100x speedup for large datasets - cuML PCA: 5-50x speedup for high-dimensional initialization Examples -------- Large dataset with cuVS acceleration:: import numpy as np from dire_rapids import DiReCuVS # 500K points, 1000 dimensions X = np.random.randn(500000, 1000) reducer = DiReCuVS(verbose=True) # Will log backend selection embedding = reducer.fit_transform(X) # Output: "Using cuVS-accelerated backend for 500000 points" """ # Log backend being used cuvs_unavailable_reason = self._cuvs_knn_unavailable_reason(X.shape[0], X.shape[1]) if self.use_cuvs and X.shape[0] >= 10000 and cuvs_unavailable_reason is None: self.logger.info(f"Using cuVS-accelerated backend for {X.shape[0]} points") else: if cuvs_unavailable_reason is None: self.logger.info(f"Using PyTorch backend for {X.shape[0]} points") else: self.logger.info( f"Using PyTorch fallback for {X.shape[0]} points " f"({cuvs_unavailable_reason})" ) return super().fit_transform(X, y)