Source code for dire_rapids.dire_cuvs

# dire_cuvs.py

"""
DIRE with cuVS backend for GPU-accelerated k-NN at scale.

This module provides optional cuVS integration for massive datasets.
Falls back to PyTorch if cuVS is not available.

Requirements:
    Follow the installation instructions at https://docs.rapids.ai/install/
"""

import numpy as np
import torch  # pylint: disable=unused-import # Used via parent class (self.device from DiRePyTorch)
from loguru import logger

# Import base DIRE PyTorch implementation
from .dire_pytorch import DiRePyTorch  # pylint: disable=cyclic-import

# Try to import cuVS and CuPy
try:
    import cupy as cp
    from cuvs.neighbors import cagra, ivf_pq, ivf_flat
    CUVS_AVAILABLE = True
    logger.info("cuVS available - GPU-accelerated k-NN enabled")
except ImportError:
    CUVS_AVAILABLE = False
    logger.warning("cuVS not available. Install RAPIDS for GPU-accelerated k-NN: "
                  "Follow the installation instructions at https://docs.rapids.ai/install/")

# Try to import cuML for GPU-accelerated PCA
try:
    from cuml.decomposition import PCA as cuPCA
    from cuml.decomposition import TruncatedSVD as cuTruncatedSVD
    CUML_AVAILABLE = True
    logger.info("cuML available - GPU-accelerated PCA enabled")
except ImportError:
    CUML_AVAILABLE = False
    if CUVS_AVAILABLE:
        logger.warning("cuML not available but cuVS is. PCA will run on CPU.")


[docs] class DiReCuVS(DiRePyTorch): """ RAPIDS cuVS/cuML accelerated implementation of DiRe for massive datasets. This class extends DiRePyTorch with optional RAPIDS cuVS (CUDA Vector Search) integration for GPU-accelerated k-nearest neighbors computation and cuML integration for GPU-accelerated PCA initialization. It provides substantial performance improvements for large-scale datasets. Performance Advantages over PyTorch/PyKeOps ------------------------------------------- - **10-100x faster k-NN**: For large datasets (>100K points) - **Massive scale support**: Handles 10M+ points efficiently - **High accuracy**: Approximate k-NN with >95% recall - **Multi-GPU ready**: Supports extreme scale processing - **GPU-accelerated PCA**: cuML PCA/SVD for initialization Automatic Fallback ------------------ Falls back to PyTorch backend if cuVS is not available, ensuring compatibility across different environments. Parameters ---------- use_cuvs : bool or None, default=None Whether to use cuVS for k-NN computation. If None, automatically detected based on availability and hardware. use_cuml : bool or None, default=None Whether to use cuML for PCA initialization. If None, automatically detected based on availability and hardware. cuvs_index_type : {'auto', 'ivf_flat', 'ivf_pq', 'cagra', 'flat'}, default='auto' Type of cuVS index to build: - 'auto': Automatically select based on data characteristics - 'ivf_flat': Inverted file index without compression - 'ivf_pq': Inverted file index with product quantization - 'cagra': Graph-based index for very large datasets - 'flat': Brute-force exact search cuvs_build_params : dict, optional Custom parameters for cuVS index building. Overrides defaults. cuvs_search_params : dict, optional Custom parameters for cuVS search. Overrides defaults. *args, **kwargs Additional arguments passed to DiRePyTorch parent class. Includes: n_components, n_neighbors, init, max_iter_layout, min_dist, spread, cutoff, neg_ratio, verbose, random_state, use_exact_repulsion, metric (custom distance function for k-NN computation). Attributes ---------- use_cuvs : bool Whether cuVS backend is enabled and available. use_cuml : bool Whether cuML backend is enabled and available. cuvs_index : object or None Built cuVS index for k-NN search. Examples -------- Basic usage with automatic backend selection:: from dire_rapids import DiReCuVS import numpy as np # Large dataset X = np.random.randn(100000, 512) # Auto-detect cuVS/cuML availability reducer = DiReCuVS() embedding = reducer.fit_transform(X) Force cuVS with custom index parameters:: reducer = DiReCuVS( use_cuvs=True, cuvs_index_type='ivf_pq', cuvs_build_params={'n_lists': 2048, 'pq_dim': 64} ) Massive dataset processing:: # 10M points, 1000 dimensions X = np.random.randn(10_000_000, 1000) reducer = DiReCuVS( use_cuvs=True, use_cuml=True, cuvs_index_type='cagra', # Best for very large datasets n_neighbors=32 ) embedding = reducer.fit_transform(X) With custom distance metric:: # cuVS with L1 distance for k-NN computation reducer = DiReCuVS( use_cuvs=True, metric='(x - y).abs().sum(-1)', # L1/Manhattan distance n_neighbors=32, cuvs_index_type='ivf_flat' ) embedding = reducer.fit_transform(X) Notes ----- **Requirements:** - RAPIDS cuVS: Follow the installation instructions at https://docs.rapids.ai/install/ - CUDA-capable GPU with compute capability >= 6.0 **Index Selection Guidelines:** - < 50K points: 'flat' (exact search) - 50K-500K points: 'ivf_flat' - 500K-5M points: 'ivf_pq' - > 5M points: 'cagra' (if dimensions <= 500) **Memory Considerations:** - cuVS requires float32 precision (no FP16 support) - Index building requires additional GPU memory - 'cagra' uses more memory but provides best performance for huge datasets """
[docs] def __init__( self, *args, use_cuvs=None, # Auto-detect by default use_cuml=None, # Auto-detect by default cuvs_index_type='auto', # 'auto', 'ivf_flat', 'ivf_pq', 'cagra' cuvs_build_params=None, cuvs_search_params=None, **kwargs ): """ Initialize DiReCuVS with cuVS and cuML backend configuration. Parameters ---------- *args Positional arguments passed to DiRePyTorch parent class. use_cuvs : bool or None, default=None Whether to use cuVS for k-NN computation: - None: Auto-detect based on availability and GPU presence - True: Force cuVS usage (raises error if unavailable) - False: Disable cuVS, use PyTorch backend use_cuml : bool or None, default=None Whether to use cuML for PCA initialization: - None: Auto-detect based on availability and GPU presence - True: Force cuML usage (raises error if unavailable) - False: Disable cuML, use sklearn backend cuvs_index_type : {'auto', 'ivf_flat', 'ivf_pq', 'cagra', 'flat'}, default='auto' Type of cuVS index to build: - 'auto': Automatically select optimal index based on data size/dimensionality - 'ivf_flat': Inverted file index without compression (good balance) - 'ivf_pq': Inverted file with product quantization (memory efficient) - 'cagra': Graph-based index (best for very large datasets) - 'flat': Brute-force exact search (small datasets only) cuvs_build_params : dict, optional Custom parameters for cuVS index building. These override the automatically determined parameters. See cuVS documentation for index-specific parameters. cuvs_search_params : dict, optional Custom parameters for cuVS search operations. These override the automatically determined parameters. See cuVS documentation for index-specific search parameters. **kwargs Additional keyword arguments passed to DiRePyTorch parent class. See DiRePyTorch documentation for available parameters including: n_components, n_neighbors, init, max_iter_layout, min_dist, spread, cutoff, neg_ratio, verbose, random_state, use_exact_repulsion, metric (custom distance function for k-NN computation). Raises ------ ImportError If cuVS or cuML are requested but not available. RuntimeError If GPU is required but not available. """ super().__init__(*args, **kwargs) # Seed CuPy random generator for cuML/cuVS operations (if available) if CUVS_AVAILABLE: cp.random.seed(self.random_state) if self.verbose: self.logger.debug(f"Seeded CuPy random generator: {self.random_state}") # Auto-detect cuVS usage if use_cuvs is None: # Use cuVS if available and we have a GPU self.use_cuvs = CUVS_AVAILABLE and self.device.type == 'cuda' else: self.use_cuvs = use_cuvs and CUVS_AVAILABLE if self.use_cuvs: logger.info("cuVS backend enabled for k-NN computation") else: if use_cuvs and not CUVS_AVAILABLE: logger.warning("cuVS requested but not available, falling back to PyTorch") # Auto-detect cuML usage for PCA if use_cuml is None: self.use_cuml = CUML_AVAILABLE and self.device.type == 'cuda' else: self.use_cuml = use_cuml and CUML_AVAILABLE if self.use_cuml: logger.info("cuML backend enabled for PCA initialization") else: if use_cuml and not CUML_AVAILABLE: logger.warning("cuML requested but not available, falling back to sklearn") self.cuvs_index_type = cuvs_index_type self.cuvs_build_params = cuvs_build_params self.cuvs_search_params = cuvs_search_params self.cuvs_index = None
def _select_cuvs_index_type(self, n_samples, n_dims): """ Automatically select optimal cuVS index type based on data characteristics. This private method uses heuristics to select the most appropriate cuVS index type based on dataset size, dimensionality, and performance trade-offs. Parameters ---------- n_samples : int Number of samples in the dataset. n_dims : int Number of dimensions/features per sample. Returns ------- str Selected cuVS index type ('flat', 'ivf_flat', 'ivf_pq', or 'cagra'). Notes ----- Private method, should not be called directly. Used by _compute_knn(). Selection Heuristics: - **< 50K samples**: 'flat' (exact search) - **50K-500K samples or >500D**: 'ivf_flat' (good balance) - **500K-5M samples**: 'ivf_pq' (memory efficient) - **> 5M samples and ≤500D**: 'cagra' (best performance) - **> 5M samples and >500D**: 'ivf_pq' (high-D fallback) """ if self.cuvs_index_type != 'auto': return self.cuvs_index_type # Decision tree based on scale and dimensionality # For high dimensions (>500), prefer IVF methods over graph-based if n_samples < 50000: # Small dataset - use flat (IVF with many lists) return 'flat' if n_samples < 500000 or n_dims > 500: # Medium dataset or high-D - IVF without compression # IVF-Flat works better than CAGRA for high dimensions return 'ivf_flat' if n_samples < 5000000: # Large dataset - IVF with compression return 'ivf_pq' # Very large dataset with moderate dimensions - graph-based return 'cagra' if n_dims <= 500 else 'ivf_pq' def _build_cuvs_index(self, X_gpu, index_type, metric='euclidean'): """ Build cuVS index for fast k-NN search. This private method constructs the appropriate cuVS index based on the specified index type and data characteristics, with optimized parameters for each index variant. Parameters ---------- X_gpu : cupy.ndarray Input data on GPU, shape (n_samples, n_features), dtype float32. index_type : str Type of index to build ('flat', 'ivf_flat', 'ivf_pq', 'cagra'). metric : str, default='euclidean' Distance metric to use ('euclidean' or 'inner_product'). Returns ------- cuVS index object or None Built cuVS index ready for search operations. Returns None for 'flat' type (no index needed). Notes ----- Private method, should not be called directly. Used by _compute_knn(). Index-Specific Optimizations: - **IVF-Flat**: Adaptive n_lists based on dataset size and dimensionality - **IVF-PQ**: Optimized PQ dimension and quantization parameters - **CAGRA**: Graph-based parameters tuned for large datasets Raises ------ ValueError If unknown index_type is specified. """ n_samples, n_dims = X_gpu.shape self.logger.info(f"Building cuVS {index_type} index for {n_samples} points in {n_dims}D with {metric} metric...") if index_type == 'flat': # Exact search - no index needed self.logger.info("Using brute-force search (exact)") return None if index_type == 'ivf_flat': # IVF without compression # For high-D data, use more lists for better quantization if n_dims > 500: # High-D: more lists help with curse of dimensionality n_lists = min(int(np.sqrt(n_samples) * 2), 8192) else: n_lists = min(int(np.sqrt(n_samples)), 4096) build_params = ivf_flat.IndexParams( n_lists=n_lists, metric=metric, add_data_on_build=True ) if self.cuvs_build_params: build_params.update(self.cuvs_build_params) index = ivf_flat.build(build_params, X_gpu) self.logger.info(f"Built IVF-Flat index with {n_lists} lists for {n_dims}D data") elif index_type == 'ivf_pq': # IVF with product quantization n_lists = min(int(np.sqrt(n_samples)), 8192) pq_dim = min(n_dims // 4, 128) # Reasonable PQ dimension build_params = ivf_pq.IndexParams( n_lists=n_lists, metric=metric, pq_dim=pq_dim, pq_bits=8, add_data_on_build=True ) if self.cuvs_build_params: build_params.update(self.cuvs_build_params) index = ivf_pq.build(build_params, X_gpu) self.logger.info(f"Built IVF-PQ index with {n_lists} lists, PQ dim={pq_dim}") elif index_type == 'cagra': # Graph-based index for very large datasets build_params = cagra.IndexParams( metric=metric, graph_degree=32, intermediate_graph_degree=64, graph_build_algo='nn_descent' ) if self.cuvs_build_params: build_params.update(self.cuvs_build_params) index = cagra.build(build_params, X_gpu) self.logger.info("Built CAGRA graph-based index") else: raise ValueError(f"Unknown index type: {index_type}") return index def _search_cuvs(self, index, index_type, X_gpu, k, metric='euclidean'): """ Search cuVS index for k nearest neighbors. This private method performs k-NN search using the built cuVS index, with optimized search parameters for each index type. Parameters ---------- index : cuVS index object or None Built cuVS index from _build_cuvs_index(). index_type : str Type of index being searched ('flat', 'ivf_flat', 'ivf_pq', 'cagra'). X_gpu : cupy.ndarray Query data on GPU, shape (n_samples, n_features), dtype float32. k : int Number of nearest neighbors to find (plus 1 for self). metric : str, default='euclidean' Distance metric to use ('euclidean' or 'inner_product'). Returns ------- tuple of cupy.ndarray distances : cupy.ndarray of shape (n_samples, k+1) Distances to k+1 nearest neighbors (including self). indices : cupy.ndarray of shape (n_samples, k+1) Indices of k+1 nearest neighbors (including self). Notes ----- Private method, should not be called directly. Used by _compute_knn(). Search Parameters: - **IVF methods**: Adaptive n_probes based on index size - **CAGRA**: Optimized search width and internal parameters - **Flat**: Uses IVF-Flat with high probe count for near-exact results Raises ------ ValueError If unknown index_type is specified. """ n_samples = X_gpu.shape[0] self.logger.info(f"Searching for {k} nearest neighbors using cuVS {index_type}...") if index_type == 'flat': # For flat/brute force, just use IVF-Flat with many lists for exact search # This avoids dtype issues with brute_force module n_lists = min(int(np.sqrt(n_samples)), 1024) build_params = ivf_flat.IndexParams( n_lists=n_lists, metric=metric, add_data_on_build=True ) index = ivf_flat.build(build_params, X_gpu) # Search with high probe count for near-exact results search_params = ivf_flat.SearchParams( n_probes=min(n_lists, 256) # High probe count for accuracy ) distances, indices = ivf_flat.search( search_params, index, X_gpu, k+1 ) elif index_type == 'ivf_flat': # IVF search search_params = ivf_flat.SearchParams( n_probes=min(index.n_lists // 10, 100) ) if self.cuvs_search_params: search_params.update(self.cuvs_search_params) distances, indices = ivf_flat.search( search_params, index, X_gpu, k+1 ) elif index_type == 'ivf_pq': # IVF-PQ search search_params = ivf_pq.SearchParams( n_probes=min(index.n_lists // 10, 200) ) if self.cuvs_search_params: search_params.update(self.cuvs_search_params) distances, indices = ivf_pq.search( search_params, index, X_gpu, k+1 ) elif index_type == 'cagra': # CAGRA search search_params = cagra.SearchParams( max_queries=0, # Automatic itopk_size=min(k * 2, 256), search_width=4 ) if self.cuvs_search_params: search_params.update(self.cuvs_search_params) distances, indices = cagra.search( search_params, index, X_gpu, k+1 ) else: raise ValueError(f"Unknown index type: {index_type}") return distances, indices def _compute_knn(self, X, chunk_size=50000, use_fp16=None): """ Compute k-NN using cuVS acceleration when available and beneficial. This method overrides the parent implementation to use cuVS for k-NN computation when it provides performance benefits, automatically falling back to PyTorch for cases where cuVS isn't optimal. Parameters ---------- X : numpy.ndarray Input data of shape (n_samples, n_features). chunk_size : int, default=50000 Chunk size for processing (used by fallback PyTorch method). use_fp16 : bool, optional Use FP16 precision (used by fallback PyTorch method). Note: cuVS requires float32, so FP16 is only used for PyTorch fallback. Notes ----- Private method, should not be called directly. Used by fit_transform(). cuVS Usage Criteria: - cuVS backend must be enabled and available - Dataset size >= 10,000 samples (cuVS overhead not worth it for smaller datasets) - Dimensionality <= 2,048 (cuVS works best for moderate dimensions) - Only native metrics supported (euclidean, inner_product) If criteria aren't met, falls back to parent PyTorch implementation. Side Effects ------------ Sets self._knn_indices and self._knn_distances with computed k-NN graph. Cleans up GPU memory after computation. """ n_samples, n_dims = X.shape # Check if custom metric expression/callable is specified # cuVS only supports named metrics like 'euclidean', 'inner_product' if self._metric_fn is not None: self.logger.warning( "Custom metric expressions/callables not supported by cuVS. " "Falling back to PyTorch backend for k-NN." ) return super()._compute_knn(X, chunk_size, use_fp16) # Decide whether to use cuVS use_cuvs_for_this = ( self.use_cuvs and n_samples >= 10000 and # cuVS overhead not worth it for small datasets n_dims <= 2048 # cuVS works best for moderate dimensions ) if not use_cuvs_for_this: # Fall back to PyTorch implementation self.logger.info("Using PyTorch backend for k-NN") return super()._compute_knn(X, chunk_size, use_fp16) # Use cuVS for k-NN self.logger.info(f"Computing {self.n_neighbors}-NN graph using cuVS...") # Determine which metric to use for cuVS # Default to euclidean, but allow named metrics cuvs_metric = 'euclidean' if self.metric_spec is not None and isinstance(self.metric_spec, str): metric_lower = self.metric_spec.strip().lower() if metric_lower in ('euclidean', 'l2', 'sqeuclidean'): cuvs_metric = 'euclidean' elif metric_lower == 'inner_product': cuvs_metric = 'inner_product' # Note: cuVS doesn't support cosine directly, would need normalization # Convert to CuPy array # Note: cuVS requires float32, not float16 # cuVS also requires C-contiguous (row-major) arrays X_gpu = cp.asarray(X, dtype=cp.float32, order='C') # Select index type index_type = self._select_cuvs_index_type(n_samples, n_dims) # Build index if index_type != 'flat': self.cuvs_index = self._build_cuvs_index(X_gpu, index_type, cuvs_metric) else: self.cuvs_index = None # Search for k-NN distances, indices = self._search_cuvs( self.cuvs_index, index_type, X_gpu, self.n_neighbors, cuvs_metric ) # Convert to CuPy arrays first, then remove self (first neighbor) and convert to numpy indices_cp = cp.asarray(indices) distances_cp = cp.asarray(distances) self._knn_indices = cp.asnumpy(indices_cp[:, 1:]) self._knn_distances = cp.asnumpy(distances_cp[:, 1:]) self.logger.info(f"k-NN graph computed: shape {self._knn_indices.shape}") # Clean up GPU memory del X_gpu if self.cuvs_index is not None: del self.cuvs_index self.cuvs_index = None cp.get_default_memory_pool().free_all_blocks() return self def _initialize_embedding(self, X): """ Initialize embedding using cuML PCA when available, with sklearn fallback. This method overrides the parent implementation to use GPU-accelerated cuML PCA/TruncatedSVD for initialization when available, providing significant speedups for high-dimensional data. Parameters ---------- X : numpy.ndarray Input high-dimensional data of shape (n_samples, n_features). Returns ------- torch.Tensor Initial embedding of shape (n_samples, n_components) on the target device. Notes ----- Private method, should not be called directly. Used by fit_transform(). cuML Usage: - Uses TruncatedSVD for high-dimensional data (>100 features) for efficiency - Uses regular PCA for lower-dimensional data - Performs normalization on GPU before converting to PyTorch - Uses DLPack for zero-copy GPU tensor transfer Falls back to parent sklearn-based initialization if: - cuML is not available or disabled - Initialization method is not 'pca' - Any errors occur during cuML processing """ if self.use_cuml and self.init == 'pca': self.logger.info("Initializing with cuML PCA (GPU-accelerated)") # Convert to CuPy array if needed if isinstance(X, np.ndarray): X_gpu = cp.asarray(X, dtype=cp.float32) else: X_gpu = X # Use TruncatedSVD for high-dimensional data (more efficient) if X.shape[1] > 100: # TruncatedSVD is perfect for high-D to low-D reduction pca = cuTruncatedSVD( n_components=self.n_components, random_state=self.random_state ) else: # Regular PCA for lower dimensions # Note: cuPCA doesn't support random_state parameter pca = cuPCA( n_components=self.n_components ) # Fit and transform on GPU embedding_gpu = pca.fit_transform(X_gpu) # Convert to PyTorch tensor on GPU # cuML returns cupy array, convert to torch embedding_cp = cp.asarray(embedding_gpu) # Normalize on GPU embedding_cp -= embedding_cp.mean(axis=0) embedding_cp /= embedding_cp.std(axis=0) # Convert to PyTorch # Use dlpack for zero-copy transfer from CuPy to PyTorch from torch.utils.dlpack import from_dlpack # pylint: disable=import-outside-toplevel embedding_torch = from_dlpack(embedding_cp.toDlpack()) return embedding_torch.to(self.device) # Fall back to CPU sklearn PCA return super()._initialize_embedding(X)
[docs] def fit_transform(self, X, y=None): """ Fit the model and transform data with cuVS/cuML acceleration. This method extends the parent implementation with intelligent backend selection and logging to inform users about the acceleration being used. Parameters ---------- X : array-like of shape (n_samples, n_features) High-dimensional input data to transform. y : array-like of shape (n_samples,), optional Ignored. Present for scikit-learn API compatibility. Returns ------- numpy.ndarray of shape (n_samples, n_components) Low-dimensional embedding of the input data. Notes ----- Backend Selection Logic: - Uses cuVS for k-NN if dataset is large enough and cuVS is available - Uses cuML for PCA initialization if available and init='pca' - Falls back to PyTorch implementations automatically Performance Benefits: - cuVS k-NN: 10-100x speedup for large datasets - cuML PCA: 5-50x speedup for high-dimensional initialization Examples -------- Large dataset with cuVS acceleration:: import numpy as np from dire_rapids import DiReCuVS # 500K points, 1000 dimensions X = np.random.randn(500000, 1000) reducer = DiReCuVS(verbose=True) # Will log backend selection embedding = reducer.fit_transform(X) # Output: "Using cuVS-accelerated backend for 500000 points" """ # Log backend being used if self.use_cuvs and X.shape[0] >= 10000: self.logger.info(f"Using cuVS-accelerated backend for {X.shape[0]} points") else: self.logger.info(f"Using PyTorch backend for {X.shape[0]} points") return super().fit_transform(X, y)