# dire_cuvs.py
"""
DIRE with cuVS backend for GPU-accelerated k-NN at scale.
This module provides optional cuVS integration for massive datasets.
Falls back to PyTorch if cuVS is not available.
Requirements:
Follow the installation instructions at https://docs.rapids.ai/install/
"""
import numpy as np
import torch # pylint: disable=unused-import # Used via parent class (self.device from DiRePyTorch)
from loguru import logger
# Import base DIRE PyTorch implementation
from .dire_pytorch import DiRePyTorch # pylint: disable=cyclic-import
# Try to import cuVS and CuPy
try:
import cupy as cp
from cuvs.neighbors import cagra, ivf_pq, ivf_flat
CUVS_AVAILABLE = True
logger.info("cuVS available - GPU-accelerated k-NN enabled")
except ImportError:
CUVS_AVAILABLE = False
logger.warning("cuVS not available. Install RAPIDS for GPU-accelerated k-NN: "
"Follow the installation instructions at https://docs.rapids.ai/install/")
# Try to import cuML for GPU-accelerated PCA
try:
from cuml.decomposition import PCA as cuPCA
from cuml.decomposition import TruncatedSVD as cuTruncatedSVD
CUML_AVAILABLE = True
logger.info("cuML available - GPU-accelerated PCA enabled")
except ImportError:
CUML_AVAILABLE = False
if CUVS_AVAILABLE:
logger.warning("cuML not available but cuVS is. PCA will run on CPU.")
[docs]
class DiReCuVS(DiRePyTorch):
"""
RAPIDS cuVS/cuML accelerated implementation of DiRe for massive datasets.
This class extends DiRePyTorch with optional RAPIDS cuVS (CUDA Vector Search)
integration for GPU-accelerated k-nearest neighbors computation and cuML
integration for GPU-accelerated PCA initialization. It provides substantial
performance improvements for large-scale datasets.
Performance Advantages over PyTorch/PyKeOps
-------------------------------------------
- **10-100x faster k-NN**: For large datasets (>100K points)
- **Massive scale support**: Handles 10M+ points efficiently
- **High accuracy**: Approximate k-NN with >95% recall
- **Multi-GPU ready**: Supports extreme scale processing
- **GPU-accelerated PCA**: cuML PCA/SVD for initialization
Automatic Fallback
------------------
Falls back to PyTorch backend if cuVS is not available, ensuring
compatibility across different environments.
Parameters
----------
use_cuvs : bool or None, default=None
Whether to use cuVS for k-NN computation. If None, automatically
detected based on availability and hardware.
use_cuml : bool or None, default=None
Whether to use cuML for PCA initialization. If None, automatically
detected based on availability and hardware.
cuvs_index_type : {'auto', 'ivf_flat', 'ivf_pq', 'cagra', 'flat'}, default='auto'
Type of cuVS index to build:
- 'auto': Automatically select based on data characteristics
- 'ivf_flat': Inverted file index without compression
- 'ivf_pq': Inverted file index with product quantization
- 'cagra': Graph-based index for very large datasets
- 'flat': Brute-force exact search
cuvs_build_params : dict, optional
Custom parameters for cuVS index building. Overrides defaults.
cuvs_search_params : dict, optional
Custom parameters for cuVS search. Overrides defaults.
*args, **kwargs
Additional arguments passed to DiRePyTorch parent class.
Includes: n_components, n_neighbors, init, max_iter_layout, min_dist,
spread, cutoff, neg_ratio, verbose, random_state, use_exact_repulsion,
metric (custom distance function for k-NN computation).
Attributes
----------
use_cuvs : bool
Whether cuVS backend is enabled and available.
use_cuml : bool
Whether cuML backend is enabled and available.
cuvs_index : object or None
Built cuVS index for k-NN search.
Examples
--------
Basic usage with automatic backend selection::
from dire_rapids import DiReCuVS
import numpy as np
# Large dataset
X = np.random.randn(100000, 512)
# Auto-detect cuVS/cuML availability
reducer = DiReCuVS()
embedding = reducer.fit_transform(X)
Force cuVS with custom index parameters::
reducer = DiReCuVS(
use_cuvs=True,
cuvs_index_type='ivf_pq',
cuvs_build_params={'n_lists': 2048, 'pq_dim': 64}
)
Massive dataset processing::
# 10M points, 1000 dimensions
X = np.random.randn(10_000_000, 1000)
reducer = DiReCuVS(
use_cuvs=True,
use_cuml=True,
cuvs_index_type='cagra', # Best for very large datasets
n_neighbors=32
)
embedding = reducer.fit_transform(X)
With custom distance metric::
# cuVS with L1 distance for k-NN computation
reducer = DiReCuVS(
use_cuvs=True,
metric='(x - y).abs().sum(-1)', # L1/Manhattan distance
n_neighbors=32,
cuvs_index_type='ivf_flat'
)
embedding = reducer.fit_transform(X)
Notes
-----
**Requirements:**
- RAPIDS cuVS: Follow the installation instructions at https://docs.rapids.ai/install/
- CUDA-capable GPU with compute capability >= 6.0
**Index Selection Guidelines:**
- < 50K points: 'flat' (exact search)
- 50K-500K points: 'ivf_flat'
- 500K-5M points: 'ivf_pq'
- > 5M points: 'cagra' (if dimensions <= 500)
**Memory Considerations:**
- cuVS requires float32 precision (no FP16 support)
- Index building requires additional GPU memory
- 'cagra' uses more memory but provides best performance for huge datasets
"""
[docs]
def __init__(
self,
*args,
use_cuvs=None, # Auto-detect by default
use_cuml=None, # Auto-detect by default
cuvs_index_type='auto', # 'auto', 'ivf_flat', 'ivf_pq', 'cagra'
cuvs_build_params=None,
cuvs_search_params=None,
**kwargs
):
"""
Initialize DiReCuVS with cuVS and cuML backend configuration.
Parameters
----------
*args
Positional arguments passed to DiRePyTorch parent class.
use_cuvs : bool or None, default=None
Whether to use cuVS for k-NN computation:
- None: Auto-detect based on availability and GPU presence
- True: Force cuVS usage (raises error if unavailable)
- False: Disable cuVS, use PyTorch backend
use_cuml : bool or None, default=None
Whether to use cuML for PCA initialization:
- None: Auto-detect based on availability and GPU presence
- True: Force cuML usage (raises error if unavailable)
- False: Disable cuML, use sklearn backend
cuvs_index_type : {'auto', 'ivf_flat', 'ivf_pq', 'cagra', 'flat'}, default='auto'
Type of cuVS index to build:
- 'auto': Automatically select optimal index based on data size/dimensionality
- 'ivf_flat': Inverted file index without compression (good balance)
- 'ivf_pq': Inverted file with product quantization (memory efficient)
- 'cagra': Graph-based index (best for very large datasets)
- 'flat': Brute-force exact search (small datasets only)
cuvs_build_params : dict, optional
Custom parameters for cuVS index building. These override the
automatically determined parameters. See cuVS documentation for
index-specific parameters.
cuvs_search_params : dict, optional
Custom parameters for cuVS search operations. These override the
automatically determined parameters. See cuVS documentation for
index-specific search parameters.
**kwargs
Additional keyword arguments passed to DiRePyTorch parent class.
See DiRePyTorch documentation for available parameters including:
n_components, n_neighbors, init, max_iter_layout, min_dist, spread,
cutoff, neg_ratio, verbose, random_state, use_exact_repulsion,
metric (custom distance function for k-NN computation).
Raises
------
ImportError
If cuVS or cuML are requested but not available.
RuntimeError
If GPU is required but not available.
"""
super().__init__(*args, **kwargs)
# Seed CuPy random generator for cuML/cuVS operations (if available)
if CUVS_AVAILABLE:
cp.random.seed(self.random_state)
if self.verbose:
self.logger.debug(f"Seeded CuPy random generator: {self.random_state}")
# Auto-detect cuVS usage
if use_cuvs is None:
# Use cuVS if available and we have a GPU
self.use_cuvs = CUVS_AVAILABLE and self.device.type == 'cuda'
else:
self.use_cuvs = use_cuvs and CUVS_AVAILABLE
if self.use_cuvs:
logger.info("cuVS backend enabled for k-NN computation")
else:
if use_cuvs and not CUVS_AVAILABLE:
logger.warning("cuVS requested but not available, falling back to PyTorch")
# Auto-detect cuML usage for PCA
if use_cuml is None:
self.use_cuml = CUML_AVAILABLE and self.device.type == 'cuda'
else:
self.use_cuml = use_cuml and CUML_AVAILABLE
if self.use_cuml:
logger.info("cuML backend enabled for PCA initialization")
else:
if use_cuml and not CUML_AVAILABLE:
logger.warning("cuML requested but not available, falling back to sklearn")
self.cuvs_index_type = cuvs_index_type
self.cuvs_build_params = cuvs_build_params
self.cuvs_search_params = cuvs_search_params
self.cuvs_index = None
def _select_cuvs_index_type(self, n_samples, n_dims):
"""
Automatically select optimal cuVS index type based on data characteristics.
This private method uses heuristics to select the most appropriate cuVS index
type based on dataset size, dimensionality, and performance trade-offs.
Parameters
----------
n_samples : int
Number of samples in the dataset.
n_dims : int
Number of dimensions/features per sample.
Returns
-------
str
Selected cuVS index type ('flat', 'ivf_flat', 'ivf_pq', or 'cagra').
Notes
-----
Private method, should not be called directly. Used by _compute_knn().
Selection Heuristics:
- **< 50K samples**: 'flat' (exact search)
- **50K-500K samples or >500D**: 'ivf_flat' (good balance)
- **500K-5M samples**: 'ivf_pq' (memory efficient)
- **> 5M samples and ≤500D**: 'cagra' (best performance)
- **> 5M samples and >500D**: 'ivf_pq' (high-D fallback)
"""
if self.cuvs_index_type != 'auto':
return self.cuvs_index_type
# Decision tree based on scale and dimensionality
# For high dimensions (>500), prefer IVF methods over graph-based
if n_samples < 50000:
# Small dataset - use flat (IVF with many lists)
return 'flat'
if n_samples < 500000 or n_dims > 500:
# Medium dataset or high-D - IVF without compression
# IVF-Flat works better than CAGRA for high dimensions
return 'ivf_flat'
if n_samples < 5000000:
# Large dataset - IVF with compression
return 'ivf_pq'
# Very large dataset with moderate dimensions - graph-based
return 'cagra' if n_dims <= 500 else 'ivf_pq'
def _build_cuvs_index(self, X_gpu, index_type, metric='euclidean'):
"""
Build cuVS index for fast k-NN search.
This private method constructs the appropriate cuVS index based on the
specified index type and data characteristics, with optimized parameters
for each index variant.
Parameters
----------
X_gpu : cupy.ndarray
Input data on GPU, shape (n_samples, n_features), dtype float32.
index_type : str
Type of index to build ('flat', 'ivf_flat', 'ivf_pq', 'cagra').
metric : str, default='euclidean'
Distance metric to use ('euclidean' or 'inner_product').
Returns
-------
cuVS index object or None
Built cuVS index ready for search operations.
Returns None for 'flat' type (no index needed).
Notes
-----
Private method, should not be called directly. Used by _compute_knn().
Index-Specific Optimizations:
- **IVF-Flat**: Adaptive n_lists based on dataset size and dimensionality
- **IVF-PQ**: Optimized PQ dimension and quantization parameters
- **CAGRA**: Graph-based parameters tuned for large datasets
Raises
------
ValueError
If unknown index_type is specified.
"""
n_samples, n_dims = X_gpu.shape
self.logger.info(f"Building cuVS {index_type} index for {n_samples} points in {n_dims}D with {metric} metric...")
if index_type == 'flat':
# Exact search - no index needed
self.logger.info("Using brute-force search (exact)")
return None
if index_type == 'ivf_flat':
# IVF without compression
# For high-D data, use more lists for better quantization
if n_dims > 500:
# High-D: more lists help with curse of dimensionality
n_lists = min(int(np.sqrt(n_samples) * 2), 8192)
else:
n_lists = min(int(np.sqrt(n_samples)), 4096)
build_params = ivf_flat.IndexParams(
n_lists=n_lists,
metric=metric,
add_data_on_build=True
)
if self.cuvs_build_params:
build_params.update(self.cuvs_build_params)
index = ivf_flat.build(build_params, X_gpu)
self.logger.info(f"Built IVF-Flat index with {n_lists} lists for {n_dims}D data")
elif index_type == 'ivf_pq':
# IVF with product quantization
n_lists = min(int(np.sqrt(n_samples)), 8192)
pq_dim = min(n_dims // 4, 128) # Reasonable PQ dimension
build_params = ivf_pq.IndexParams(
n_lists=n_lists,
metric=metric,
pq_dim=pq_dim,
pq_bits=8,
add_data_on_build=True
)
if self.cuvs_build_params:
build_params.update(self.cuvs_build_params)
index = ivf_pq.build(build_params, X_gpu)
self.logger.info(f"Built IVF-PQ index with {n_lists} lists, PQ dim={pq_dim}")
elif index_type == 'cagra':
# Graph-based index for very large datasets
build_params = cagra.IndexParams(
metric=metric,
graph_degree=32,
intermediate_graph_degree=64,
graph_build_algo='nn_descent'
)
if self.cuvs_build_params:
build_params.update(self.cuvs_build_params)
index = cagra.build(build_params, X_gpu)
self.logger.info("Built CAGRA graph-based index")
else:
raise ValueError(f"Unknown index type: {index_type}")
return index
def _search_cuvs(self, index, index_type, X_gpu, k, metric='euclidean'):
"""
Search cuVS index for k nearest neighbors.
This private method performs k-NN search using the built cuVS index,
with optimized search parameters for each index type.
Parameters
----------
index : cuVS index object or None
Built cuVS index from _build_cuvs_index().
index_type : str
Type of index being searched ('flat', 'ivf_flat', 'ivf_pq', 'cagra').
X_gpu : cupy.ndarray
Query data on GPU, shape (n_samples, n_features), dtype float32.
k : int
Number of nearest neighbors to find (plus 1 for self).
metric : str, default='euclidean'
Distance metric to use ('euclidean' or 'inner_product').
Returns
-------
tuple of cupy.ndarray
distances : cupy.ndarray of shape (n_samples, k+1)
Distances to k+1 nearest neighbors (including self).
indices : cupy.ndarray of shape (n_samples, k+1)
Indices of k+1 nearest neighbors (including self).
Notes
-----
Private method, should not be called directly. Used by _compute_knn().
Search Parameters:
- **IVF methods**: Adaptive n_probes based on index size
- **CAGRA**: Optimized search width and internal parameters
- **Flat**: Uses IVF-Flat with high probe count for near-exact results
Raises
------
ValueError
If unknown index_type is specified.
"""
n_samples = X_gpu.shape[0]
self.logger.info(f"Searching for {k} nearest neighbors using cuVS {index_type}...")
if index_type == 'flat':
# For flat/brute force, just use IVF-Flat with many lists for exact search
# This avoids dtype issues with brute_force module
n_lists = min(int(np.sqrt(n_samples)), 1024)
build_params = ivf_flat.IndexParams(
n_lists=n_lists,
metric=metric,
add_data_on_build=True
)
index = ivf_flat.build(build_params, X_gpu)
# Search with high probe count for near-exact results
search_params = ivf_flat.SearchParams(
n_probes=min(n_lists, 256) # High probe count for accuracy
)
distances, indices = ivf_flat.search(
search_params, index, X_gpu, k+1
)
elif index_type == 'ivf_flat':
# IVF search
search_params = ivf_flat.SearchParams(
n_probes=min(index.n_lists // 10, 100)
)
if self.cuvs_search_params:
search_params.update(self.cuvs_search_params)
distances, indices = ivf_flat.search(
search_params, index, X_gpu, k+1
)
elif index_type == 'ivf_pq':
# IVF-PQ search
search_params = ivf_pq.SearchParams(
n_probes=min(index.n_lists // 10, 200)
)
if self.cuvs_search_params:
search_params.update(self.cuvs_search_params)
distances, indices = ivf_pq.search(
search_params, index, X_gpu, k+1
)
elif index_type == 'cagra':
# CAGRA search
search_params = cagra.SearchParams(
max_queries=0, # Automatic
itopk_size=min(k * 2, 256),
search_width=4
)
if self.cuvs_search_params:
search_params.update(self.cuvs_search_params)
distances, indices = cagra.search(
search_params, index, X_gpu, k+1
)
else:
raise ValueError(f"Unknown index type: {index_type}")
return distances, indices
def _compute_knn(self, X, chunk_size=50000, use_fp16=None):
"""
Compute k-NN using cuVS acceleration when available and beneficial.
This method overrides the parent implementation to use cuVS for k-NN
computation when it provides performance benefits, automatically falling
back to PyTorch for cases where cuVS isn't optimal.
Parameters
----------
X : numpy.ndarray
Input data of shape (n_samples, n_features).
chunk_size : int, default=50000
Chunk size for processing (used by fallback PyTorch method).
use_fp16 : bool, optional
Use FP16 precision (used by fallback PyTorch method).
Note: cuVS requires float32, so FP16 is only used for PyTorch fallback.
Notes
-----
Private method, should not be called directly. Used by fit_transform().
cuVS Usage Criteria:
- cuVS backend must be enabled and available
- Dataset size >= 10,000 samples (cuVS overhead not worth it for smaller datasets)
- Dimensionality <= 2,048 (cuVS works best for moderate dimensions)
- Only native metrics supported (euclidean, inner_product)
If criteria aren't met, falls back to parent PyTorch implementation.
Side Effects
------------
Sets self._knn_indices and self._knn_distances with computed k-NN graph.
Cleans up GPU memory after computation.
"""
n_samples, n_dims = X.shape
# Check if custom metric expression/callable is specified
# cuVS only supports named metrics like 'euclidean', 'inner_product'
if self._metric_fn is not None:
self.logger.warning(
"Custom metric expressions/callables not supported by cuVS. "
"Falling back to PyTorch backend for k-NN."
)
return super()._compute_knn(X, chunk_size, use_fp16)
# Decide whether to use cuVS
use_cuvs_for_this = (
self.use_cuvs and
n_samples >= 10000 and # cuVS overhead not worth it for small datasets
n_dims <= 2048 # cuVS works best for moderate dimensions
)
if not use_cuvs_for_this:
# Fall back to PyTorch implementation
self.logger.info("Using PyTorch backend for k-NN")
return super()._compute_knn(X, chunk_size, use_fp16)
# Use cuVS for k-NN
self.logger.info(f"Computing {self.n_neighbors}-NN graph using cuVS...")
# Determine which metric to use for cuVS
# Default to euclidean, but allow named metrics
cuvs_metric = 'euclidean'
if self.metric_spec is not None and isinstance(self.metric_spec, str):
metric_lower = self.metric_spec.strip().lower()
if metric_lower in ('euclidean', 'l2', 'sqeuclidean'):
cuvs_metric = 'euclidean'
elif metric_lower == 'inner_product':
cuvs_metric = 'inner_product'
# Note: cuVS doesn't support cosine directly, would need normalization
# Convert to CuPy array
# Note: cuVS requires float32, not float16
# cuVS also requires C-contiguous (row-major) arrays
X_gpu = cp.asarray(X, dtype=cp.float32, order='C')
# Select index type
index_type = self._select_cuvs_index_type(n_samples, n_dims)
# Build index
if index_type != 'flat':
self.cuvs_index = self._build_cuvs_index(X_gpu, index_type, cuvs_metric)
else:
self.cuvs_index = None
# Search for k-NN
distances, indices = self._search_cuvs(
self.cuvs_index, index_type, X_gpu, self.n_neighbors, cuvs_metric
)
# Convert to CuPy arrays first, then remove self (first neighbor) and convert to numpy
indices_cp = cp.asarray(indices)
distances_cp = cp.asarray(distances)
self._knn_indices = cp.asnumpy(indices_cp[:, 1:])
self._knn_distances = cp.asnumpy(distances_cp[:, 1:])
self.logger.info(f"k-NN graph computed: shape {self._knn_indices.shape}")
# Clean up GPU memory
del X_gpu
if self.cuvs_index is not None:
del self.cuvs_index
self.cuvs_index = None
cp.get_default_memory_pool().free_all_blocks()
return self
def _initialize_embedding(self, X):
"""
Initialize embedding using cuML PCA when available, with sklearn fallback.
This method overrides the parent implementation to use GPU-accelerated
cuML PCA/TruncatedSVD for initialization when available, providing
significant speedups for high-dimensional data.
Parameters
----------
X : numpy.ndarray
Input high-dimensional data of shape (n_samples, n_features).
Returns
-------
torch.Tensor
Initial embedding of shape (n_samples, n_components) on the target device.
Notes
-----
Private method, should not be called directly. Used by fit_transform().
cuML Usage:
- Uses TruncatedSVD for high-dimensional data (>100 features) for efficiency
- Uses regular PCA for lower-dimensional data
- Performs normalization on GPU before converting to PyTorch
- Uses DLPack for zero-copy GPU tensor transfer
Falls back to parent sklearn-based initialization if:
- cuML is not available or disabled
- Initialization method is not 'pca'
- Any errors occur during cuML processing
"""
if self.use_cuml and self.init == 'pca':
self.logger.info("Initializing with cuML PCA (GPU-accelerated)")
# Convert to CuPy array if needed
if isinstance(X, np.ndarray):
X_gpu = cp.asarray(X, dtype=cp.float32)
else:
X_gpu = X
# Use TruncatedSVD for high-dimensional data (more efficient)
if X.shape[1] > 100:
# TruncatedSVD is perfect for high-D to low-D reduction
pca = cuTruncatedSVD(
n_components=self.n_components,
random_state=self.random_state
)
else:
# Regular PCA for lower dimensions
# Note: cuPCA doesn't support random_state parameter
pca = cuPCA(
n_components=self.n_components
)
# Fit and transform on GPU
embedding_gpu = pca.fit_transform(X_gpu)
# Convert to PyTorch tensor on GPU
# cuML returns cupy array, convert to torch
embedding_cp = cp.asarray(embedding_gpu)
# Normalize on GPU
embedding_cp -= embedding_cp.mean(axis=0)
embedding_cp /= embedding_cp.std(axis=0)
# Convert to PyTorch
# Use dlpack for zero-copy transfer from CuPy to PyTorch
from torch.utils.dlpack import from_dlpack # pylint: disable=import-outside-toplevel
embedding_torch = from_dlpack(embedding_cp.toDlpack())
return embedding_torch.to(self.device)
# Fall back to CPU sklearn PCA
return super()._initialize_embedding(X)