Source code for dire_rapids.atlas_cpu

"""
CPU implementation of atlas topology computation.

Uses sklearn for kNN, numpy for arrays, and scipy sparse matrices.
"""

import numpy as np
from scipy import sparse
from scipy.sparse.linalg import eigsh
from sklearn.neighbors import NearestNeighbors


[docs] def compute_h0_h1_atlas_cpu(data, k_neighbors=20, density_threshold=0.8, overlap_factor=1.5, return_distances=False): """ CPU implementation of atlas-based H0/H1 computation. Build dense local triangulations around each point, then merge consistently. Parameters ---------- data : array-like Point cloud data (n_samples, n_features) k_neighbors : int Size of local neighborhood density_threshold : float Percentile threshold for edge inclusion (0-1) overlap_factor : float Factor for expanding local neighborhoods return_distances : bool If True, also return edge-to-distance mapping Returns ------- tuple : (h0_diagram, h1_diagram) or (h0_diagram, h1_diagram, edge_distances) """ data = np.asarray(data, dtype=np.float32) n = len(data) if n <= 2: result = (np.array([[0, np.inf]]), np.array([[0, 0]])) if return_distances: return result + ({},) return result # Build kNN graph k_neighbors = min(k_neighbors, n - 1) nn = NearestNeighbors(n_neighbors=k_neighbors + 1, metric='euclidean') nn.fit(data) distances, indices = nn.kneighbors(data) # Track edge distances for persistence edge_distances: dict = {} if return_distances else None # type: ignore[assignment] # Global edge and triangle sets global_edges = set() global_triangles = set() # Build atlas patches for i in range(n): local_neighborhood = indices[i, 1:] # Exclude self local_dists = distances[i, 1:] # Edges from i to neighbors for j_idx, j in enumerate(local_neighborhood): edge = tuple(sorted([i, int(j)])) global_edges.add(edge) if return_distances and edge not in edge_distances: # type: ignore[operator] edge_distances[edge] = local_dists[j_idx] # type: ignore[index] # Dense local patch: connect neighbors if close enough dist_threshold = np.percentile(local_dists, density_threshold * 100) * overlap_factor # Vectorized distance computation within neighborhood if len(local_neighborhood) > 1: neighborhood_coords = data[local_neighborhood] for idx_j, j_val in enumerate(local_neighborhood): j = int(j_val) # Vectorized distance from j to all k > j dists_jk = np.linalg.norm( neighborhood_coords[idx_j+1:] - neighborhood_coords[idx_j], axis=1 ) close_enough = dists_jk < dist_threshold for offset, is_close in enumerate(close_enough): if is_close: k = int(local_neighborhood[idx_j + 1 + offset]) edge = tuple(sorted([j, k])) global_edges.add(edge) if return_distances and edge not in edge_distances: # type: ignore[operator] edge_distances[edge] = dists_jk[offset] # type: ignore[index] # Build triangles for idx_j, j_val in enumerate(local_neighborhood): for idx_k in range(idx_j + 1, len(local_neighborhood)): j = int(j_val) k = int(local_neighborhood[idx_k]) e1 = tuple(sorted([i, j])) e2 = tuple(sorted([i, k])) e3 = tuple(sorted([j, k])) if e1 in global_edges and e2 in global_edges and e3 in global_edges: global_triangles.add(tuple(sorted([i, j, k]))) edges = sorted(list(global_edges)) triangles = sorted(list(global_triangles)) n_edges = len(edges) n_triangles = len(triangles) edge_to_idx = {e: idx for idx, e in enumerate(edges)} # Build B1 (edge boundary operator) B1_rows, B1_cols, B1_data = [], [], [] for e_idx, (v0, v1) in enumerate(edges): B1_rows.extend([v0, v1]) B1_cols.extend([e_idx, e_idx]) B1_data.extend([-1, +1]) B1 = sparse.coo_matrix((B1_data, (B1_rows, B1_cols)), shape=(n, n_edges), dtype=np.float64) B1 = B1.tocsr().astype(np.float64) # Build B2 (triangle boundary operator) if n_triangles > 0: B2_rows, B2_cols, B2_data = [], [], [] for t_idx, (i, j, k) in enumerate(triangles): e1 = tuple(sorted([i, j])) e2 = tuple(sorted([i, k])) e3 = tuple(sorted([j, k])) if e1 in edge_to_idx: B2_rows.append(edge_to_idx[e1]) B2_cols.append(t_idx) B2_data.append(+1 if i < j else -1) if e2 in edge_to_idx: B2_rows.append(edge_to_idx[e2]) B2_cols.append(t_idx) B2_data.append(-1 if i < k else +1) if e3 in edge_to_idx: B2_rows.append(edge_to_idx[e3]) B2_cols.append(t_idx) B2_data.append(+1 if j < k else -1) B2 = sparse.coo_matrix((B2_data, (B2_rows, B2_cols)), shape=(n_edges, n_triangles), dtype=np.float64) B2 = B2.tocsr().astype(np.float64) # Hodge Laplacian L1 = B1^T @ B1 + B2 @ B2^T L1 = (B1.T @ B1 + B2 @ B2.T).astype(np.float64) else: L1 = (B1.T @ B1).astype(np.float64) # Build L0 for H0 (connected components) adj_rows, adj_cols, adj_data = [], [], [] for v0, v1 in edges: adj_rows.extend([v0, v1]) adj_cols.extend([v1, v0]) adj_data.extend([1.0, 1.0]) A = sparse.coo_matrix((adj_data, (adj_rows, adj_cols)), shape=(n, n)) A = A.tocsr() deg = np.array(A.sum(axis=1)).flatten() D = sparse.diags(deg) L0 = D - A # Compute eigenvalues n_eigs_h0 = min(50, n - 2) n_eigs_h1 = min(50, n_edges - 2) if n_edges > 2 else 0 try: eigs_h0, _ = eigsh(L0, k=n_eigs_h0, which='SM', tol=1e-4) eigs_h0 = np.abs(eigs_h0) except Exception: # pylint: disable=broad-exception-caught eigs_h0 = np.array([]) if n_eigs_h1 > 0: try: eigs_h1, _ = eigsh(L1, k=n_eigs_h1, which='SM', tol=1e-4) eigs_h1 = np.abs(eigs_h1) except Exception: # pylint: disable=broad-exception-caught eigs_h1 = np.array([]) else: eigs_h1 = np.array([]) # Count zero eigenvalues (Betti numbers) beta_0 = np.sum(eigs_h0 < 1e-6) if len(eigs_h0) > 0 else 1 beta_1 = np.sum(eigs_h1 < 1e-6) if len(eigs_h1) > 0 else 0 # Build persistence diagrams h0_features = [[0.0, np.inf] for _ in range(beta_0)] h1_features = [[0.0, np.inf] for _ in range(beta_1)] h0_diagram = np.array(h0_features) if h0_features else np.array([[0, 0]]) h1_diagram = np.array(h1_features) if h1_features else np.array([[0, 0]]) if return_distances: return h0_diagram, h1_diagram, edge_distances return h0_diagram, h1_diagram