diff --git a/python/cuvs_bench/cuvs_bench/generate_groundtruth/__main__.py b/python/cuvs_bench/cuvs_bench/generate_groundtruth/__main__.py index 4315c8e3ac..7dd82d3955 100644 --- a/python/cuvs_bench/cuvs_bench/generate_groundtruth/__main__.py +++ b/python/cuvs_bench/cuvs_bench/generate_groundtruth/__main__.py @@ -10,6 +10,7 @@ import warnings from .utils import ( + add_jitter, groundtruth_neighbors_filename, memmap_bin_file, offset_neighbor_indices, @@ -111,6 +112,22 @@ def choose_random_queries(dataset, n_queries): return dataset[query_idx, :] +def choose_random_queries_with_jitter(dataset, n_queries, seed=12345): + """Pick ``n_queries`` random rows from ``dataset`` and add Gaussian jitter + at scale ``0.1 * std(sample)``. + """ + import numpy as _np + + print("Choosing random vectors from dataset and jittering with noise") + rng = _np.random.default_rng(seed) + n_rows = dataset.shape[0] + # Sort indices so the memmap read is sequential rather than random-access. + query_idx = _np.sort(rng.choice(n_rows, size=n_queries, replace=False)) + sampled = dataset[query_idx, :].astype(_np.float32, copy=True) + + return add_jitter(sampled, rng, normalize=False) + + def cpu_search(dataset, queries, k, metric="squeclidean"): """ Find the k nearest neighbors for each query point in the dataset using the @@ -235,18 +252,22 @@ def main(): "The input and output files are in big-ann-benchmark's binary format.", epilog="""Example usage # With existing query file - python -m cuvs_bench.generate_groundtruth --dataset /dataset/base.\ -fbin --output=groundtruth_dir --queries=/dataset/query.public.10K.fbin + python -m cuvs_bench.generate_groundtruth /dataset/base.fbin \ +--output=groundtruth_dir --queries=/dataset/query.public.10K.fbin # With randomly generated queries - python -m cuvs_bench.generate_groundtruth --dataset /dataset/base.\ -fbin --output=groundtruth_dir --queries=random --n_queries=10000 + python -m cuvs_bench.generate_groundtruth /dataset/base.fbin \ +--output=groundtruth_dir --queries=random --n_queries=10000 # Using only a subset of the dataset. Define queries by randomly # selecting vectors from the (subset of the) dataset. - python -m cuvs_bench.generate_groundtruth --dataset /dataset/base.\ -fbin --nrows=2000000 --cols=128 --output=groundtruth_dir \ + python -m cuvs_bench.generate_groundtruth /dataset/base.fbin \ +--rows=2000000 --cols=128 --output=groundtruth_dir \ --queries=random-choice --n_queries=10000 + + # Jittered queries (following the logic of cuvs_bench.synthesize_dataset) + python -m cuvs_bench.generate_groundtruth /dataset/base.fbin \ +--output=groundtruth_dir --queries=random-jitter --n_queries=10000 """, formatter_class=argparse.RawDescriptionHelpFormatter, ) @@ -256,9 +277,11 @@ def main(): "--queries", type=str, default="random", - help="Queries file name, or one of 'random-choice' or 'random' " - "(default). 'random-choice': select n_queries vectors from the input " - "dataset. 'random': generate n_queries as uniform random numbers.", + help="Queries file name, or one of 'random-choice', 'random-jitter', " + "or 'random' (default). 'random-choice': select n_queries vectors " + "from the input dataset. 'random-jitter': same as 'random-choice', " + "but add std-relative Gaussian noise to each query. 'random': generate " + "n_queries as uniform random numbers.", ) parser.add_argument( "--output", @@ -341,7 +364,7 @@ def main(): if len(args.output) > 0: os.makedirs(args.output, exist_ok=True) - if args.queries == "random" or args.queries == "random-choice": + if args.queries in {"random", "random-choice", "random-jitter"}: if args.n_queries is None: raise RuntimeError( "n_queries must be given to generate random queries" @@ -352,9 +375,13 @@ def main(): ) elif args.queries == "random-choice": queries = choose_random_queries(dataset, args.n_queries) + elif args.queries == "random-jitter": + queries = choose_random_queries_with_jitter( + dataset, args.n_queries + ) queries_filename = os.path.join( - args.output, "queries" + suffix_from_dtype(dtype) + args.output, "queries" + suffix_from_dtype(queries.dtype) ) print("Writing queries file", queries_filename) write_bin(queries_filename, queries) diff --git a/python/cuvs_bench/cuvs_bench/generate_groundtruth/utils.py b/python/cuvs_bench/cuvs_bench/generate_groundtruth/utils.py index a561403a75..b46c9ed84e 100644 --- a/python/cuvs_bench/cuvs_bench/generate_groundtruth/utils.py +++ b/python/cuvs_bench/cuvs_bench/generate_groundtruth/utils.py @@ -10,6 +10,22 @@ from cuvs_bench._bin_format import read_bin_header, write_bin_header +def add_jitter( + queries: np.ndarray, + rng: np.random.Generator, + normalize: bool, +) -> np.ndarray: + """Add Gaussian jitter to query vectors and optionally re-normalize.""" + noise_scale = float(np.std(queries)) * 0.1 + queries = queries + rng.normal(0, noise_scale, queries.shape).astype( + np.float32 + ) + if normalize: + norms = np.linalg.norm(queries, axis=1, keepdims=True) + queries = queries / np.maximum(norms, 1e-8) + return queries.astype(np.float32) + + def dtype_from_filename(filename): ext = os.path.splitext(filename)[1] if ext == ".fbin": diff --git a/python/cuvs_bench/cuvs_bench/synthesize_dataset/README.md b/python/cuvs_bench/cuvs_bench/synthesize_dataset/README.md new file mode 100644 index 0000000000..e7640e2cd3 --- /dev/null +++ b/python/cuvs_bench/cuvs_bench/synthesize_dataset/README.md @@ -0,0 +1,322 @@ +# `cuvs_bench.synthesize_dataset` + +Generates a synthetic dataset of arbitrary size from a small sample of a real dataset, for benchmarking vector indexes at scales (100M, 1B, 10B+) where the real dataset isn't available at the target size. Purely random vectors aren't a substitute — the clustering structure, neighbor density, and intrinsic dimensionality of real embeddings are what dominate ANN index behavior, and random data has none of those. + +The synthetic dataset approximates the real dataset's behavior under ANN search rather than its raw vector content. When you build the same index on both and sweep the same search-effort knob (CAGRA's `itopk_size`, HNSW's `efSearch`, DiskANN's `L`, …), the recall–throughput Pareto curves track each other and relative algorithm rankings carry over. + +Ground truth is generated alongside the data and can be computed at any target size without storing the full dataset on disk. See [How it works](#how-it-works) for the mechanism. + + +# When to use this + +Use it when: + +- You want to benchmark a vector index at 100M / 1B / 10B+ but only have a small (~10M) real dataset. +- You want benchmark numbers (recall, throughput, build time) on the synthetic data that **carry over** to what you'd see on the real data at the same scale. +- The real dataset is sensitive or proprietary and can't be shared, but you want a collaborator (or vendor) to benchmark a vector index against something representative of it. Fit a fingerprint locally and share that small file (.npz) instead of the dataset — it stores only per-cluster aggregate statistics, never raw rows. The collaborator regenerates the synthetic dataset locally. See [How it works](#how-it-works) for what's in the fingerprint. + +**Don't** use it when: + +- You want to use the data for anything other than ANN-index benchmarking — training or fine-tuning embedding models, evaluating downstream ML tasks, distance-correctness tests against a real reference, or any workload that depends on the *actual vector content*. The synthetic vectors only preserve index-search behavior; they aren't a substitute for the real dataset in any other context. + +# How it works + +![synthesize_dataset pipeline overview](figures/pipeline.png) + +The `fit` step learns the small fingerprint highlighted in blue above. The `generate` step turns the fingerprint into a synthetic dataset of arbitrary size, and ground truth is computed alongside it: each query probes its `nprobes` nearest clusters, those clusters are regenerated deterministically from the fingerprint, and brute-force k-NN is run against just those candidates. This is what makes 10B+ scale GT tractable. Note that `nprobes` here is an internal parameter of the GT pipeline (how many clusters we probe to find true neighbors), not a search-time parameter of any specific ANN index. + +### What the fingerprint contains + +The `fit` step produces a single NPZ file holding only per-cluster aggregate statistics: + +- `centroids` — means over each cluster's points. +- `pca_components_arr` — eigenvectors of the per-cluster residual covariance (linear functionals of the cluster, not raw rows). +- `pca_explained_var_arr`, `pca_noise_var` — eigenvalues and residual variance scalars. +- `pca_n_components` — the requested number of PCA components. +- `variances_per_dim` — per-dim variance per cluster. +- `densities` — per-cluster point counts, normalized. +- `norm_unit` — single bool: whether the fit-time input was (≈) L2-unit-norm. +- `norm_quantiles` — per-cluster norm inverse-CDF: a `(n_clusters, 256)` grid of the real vector-norm distribution within each cluster. At generate time each vector draws a target norm from its cluster's grid (inverse-CDF sampling), so the synthetic data reproduces each cluster's real *radial spread*. + +No individual row of the original data is stored anywhere in the fingerprint, and the synthetic data emitted at generate-time is `centroid + RNG`, never a copy of a real row. + +### Query generation (random jitter) + +Queries are sampled from the dataset itself and perturbed with a small Gaussian, so they sit on the data manifold without being exact-match copies of database rows. The `generate` step produces them automatically alongside `base.fbin` and GT. + +If you also have the real dataset and want matching queries on it (e.g. for the synth-vs-real comparison in Step 7), run `cuvs_bench.generate_groundtruth --queries random-jitter` on it. + +# Requirements + +- `cuvs` (already a dependency of cuvs-bench) — provides KMeans, PCA, and brute-force k-NN. +- `cupy`, `numpy`, `tqdm`. +- A GPU. CPU-only operation isn't supported. + +# Step-by-step + +### Step 1 — Pick `sample_size`, `n_clusters`, and `ncomp` + +Two raw sizes feed the recipe: + +- **`total_rows`** — target size of the synthetic dataset you want to generate (e.g. 100M, 1B, 10B etc). +- **`sample_size`** — size of the real-data slice you fit the fingerprint on (passed to `fit`). + +Three knobs are derived from (or chosen alongside) those: + +- **SF** (scale factor) = `total_rows / sample_size` — how much we stretch the fitting sample to reach the target size. +- **PPC** (points per cluster) = `sample_size / n_clusters` — average number of real sample points per KMeans cluster. +- **`ncomp`** — number of PCA components fit per cluster, i.e. the rank of the low-rank Gaussian sampler used to draw synthetic points around each centroid. + +Use the recommended defaults: **SF=100, PPC=100, `ncomp=32`**. With those fixed, the only thing you choose is the **target size**, and everything else follows mechanically: + +```text +sample_size = target_size / 100 # SF = target_size / sample_size = 100 +n_clusters = sample_size / 100 # PPC = sample_size / n_clusters = 100 +ncomp = 32 # good middle ground at PPC=100 +``` + +So for a 100M target: `sample_size=1M`, `n_clusters=10K`, `ncomp=32`. For a 1B target: `sample_size=10M`, `n_clusters=100K`, `ncomp=32`. + +`ncomp=32` is the default because it sits in the empirical sweet spot at PPC=100 — large enough to capture the dominant intra-cluster correlations on typical embedding data, small enough to estimate reliably with 100 points per cluster. Above the ceiling listed below, recall degrades uniformly across the entire recall range; below ~16 the model is under-expressive at large target sizes. + +| Embedding dim | `ncomp` ceiling | Recommended `ncomp` | +|-----------------------|-----------------|---------------------| +| 1024d (e.g. Falcon) | ~48 | 32 | +| 768d (e.g. Wiki) | ~32 | 32 | +| 128d (e.g. BigANN) | ~32 | 24 | + + +### Step 2 — Fit the cluster fingerprint + +```bash +python -m cuvs_bench.synthesize_dataset fit \ + --dataset /path/to/your_real_data.fbin \ + --sample_size 1000000 \ + --n_clusters 10000 \ + --pca_components 32 \ + --output fingerprint.npz +``` + +The output `fingerprint.npz` can be reused for any target size. + +Supported input formats: `.fbin` (cuvs-bench layout), `.npy`, `.pkl`. + +### Step 3 — (Recommended) Verify `nprobes` at small scale + +`nprobes` is how many nearest clusters each ground-truth query probes when computing GT. We don't search the whole synthetic dataset for each query — that would be O(`total_rows`) per query and takes long at multi-billion scale. Instead, for each query we re-generate only the `nprobes` clusters whose centroids are closest to the query and run brute-force k-NN against just those points. + +By construction `nprobes ≤ n_clusters` (probing more clusters than exist is meaningless). **Recommended starting point: `nprobes ≈ 5% of n_clusters`.** For the fit recipe in Step 1 (`n_clusters=10K` at 100M targets, `100K` at 1B), that lands at `nprobes=500` and `5000` respectively. Empirically this keeps Step 3's recall above 0.999, i.e. the cheap nprobe GT and the exact GT agree on more than 99.9% of true neighbors. Higher `nprobes` is more accurate but linearly more expensive at GT time. + +> **What "recall" means in this step.** The recall reported by `verify` is *internal to the GT pipeline* — it measures how well the cheap nprobe GT agrees with the exact streaming GT. It is **not** the recall of any ANN index you'll later benchmark on the synthetic data. + +**`nprobes` requirements *shrink* with `total_rows`, not grow.** A fingerprint has a fixed `n_clusters`, so at larger `total_rows` each cluster holds proportionally more points (same `densities`, just more samples each). For a fixed `k`, a query's top-`k` true neighbors then concentrate in *fewer* nearby clusters than they would at a smaller scale — both because each cluster contributes more candidates and because the k-th nearest distance shrinks as overall density grows. So an `nprobes` calibrated at a small `total_rows` is a **safe upper bound** for the same fingerprint at larger target sizes — it can only over-probe at the larger scale, never under-probe. + +Before paying for the full-scale generation, run the cheap nprobe GT against the exact streaming GT at a small `total_rows` and check that they agree: + +```bash +python -m cuvs_bench.synthesize_dataset verify \ + --fingerprint fingerprint.npz \ + --total_rows 1000000 \ + --n_queries 1000 \ + --k 10 \ + --nprobes 500 # ~5% of n_clusters=10K +``` + +If GT-vs-GT recall ≥ 0.999, that `nprobes` is safe to reuse in Step 4 at any larger `total_rows` — Step 3 conservatively over-estimates what you actually need at scale, so the value carries over without re-tuning. If recall is lower, bump `nprobes` and re-run Step 3 (cheap). + +### Step 4 — Generate the synthetic dataset + queries + GT + +#### **Option 1 (Fast path): nprobe GT.** +Uses the `nprobes` you settled on in Step 3 — only the `nprobes` clusters nearest each query are regenerated and brute-forced. As mentioned in the previous step, using about 5% of `n_clusters` is a good heuristic to reach ≥ 0.999. + +```bash +python -m cuvs_bench.synthesize_dataset generate \ + --fingerprint fingerprint.npz \ + --total_rows 1000000000 \ + --n_queries 10000 \ + --k 10 \ + --nprobes 500 \ + --gt_mode nprobe \ + --output_dir synthetic_100M/ +``` + +#### **Option 2 (Exact path, slower): exact GT.** +Set `--gt_mode exact` for exact GT — every cluster is regenerated on the GPU and brute-forced against the queries. Slower than the nprobe path, but still faster than a standard GT computation at the same `total_rows` because there's no disk I/O or host↔device transfer of the dataset. + +```bash +python -m cuvs_bench.synthesize_dataset generate \ + --fingerprint fingerprint.npz \ + --total_rows 1000000000 \ + --n_queries 10000 \ + --k 10 \ + --gt_mode exact \ + --output_dir synthetic_100M/ +``` + +Outputs (cuvs-bench-compatible): + +- `synthetic_100M/base.fbin` +- `synthetic_100M/queries.fbin` +- `synthetic_100M/groundtruth.neighbors.ibin` +- `synthetic_100M/groundtruth.distances.fbin` + +> **Disk-size note.** `base.fbin` is materialized to disk at full `total_rows × ncols × 4` bytes. Make sure `--output_dir` lives on a volume with enough free space before kicking off a large run. Generation streams cluster-by-cluster directly to disk, so host RAM stays bounded (~a few GB) regardless of `total_rows` — only the disk volume matters. + +### Step 5 — Register the synthetic dataset for `cuvs_bench.run` + +`cuvs_bench.run` resolves datasets by **name** from a YAML registry and expects a directory layout where each dataset lives in its own subdirectory under a common `--dataset-path`. Drop the `synthetic_100M/` directory you just produced under whatever you use as your dataset root, and add a YAML entry that points at it. + +**1. Lay out the files** so `--dataset-path` is the parent: + +```text +$RAPIDS_DATASET_ROOT_DIR/ +└── my_real_data_synth_100M/ # ← synthetic, produced in Step 4 + ├── base.fbin + ├── queries.fbin + ├── groundtruth.neighbors.ibin + └── groundtruth.distances.fbin +``` + +**2. Register the dataset** in a YAML file (`my_datasets.yaml`): + +```yaml +- name: my_real_data_synth_100M + dims: 768 + distance: euclidean + base_file: my_real_data_synth_100M/base.fbin + query_file: my_real_data_synth_100M/queries.fbin + groundtruth_neighbors_file: my_real_data_synth_100M/groundtruth.neighbors.ibin + groundtruth_distances_file: my_real_data_synth_100M/groundtruth.distances.fbin +``` + +The `distance` value must match what your real dataset uses (`euclidean` or `inner_product`). Use the same `dims` you fitted with. + +### Step 6 — Benchmark and plot + +Run the algorithms you care about and produce a recall-vs-throughput (or recall-vs-latency) Pareto plot: + +```bash +python -m cuvs_bench.run \ + --dataset my_real_data_synth_100M \ + --dataset-path $RAPIDS_DATASET_ROOT_DIR \ + --dataset-configuration ./my_datasets.yaml \ + --algorithms cuvs_cagra,cuvs_ivf_pq,hnswlib \ + --build --search + +python -m cuvs_bench.plot \ + --dataset my_real_data_synth_100M \ + --dataset-path $RAPIDS_DATASET_ROOT_DIR \ + --algorithms cuvs_cagra,cuvs_ivf_pq,hnswlib \ + --output-filepath ./plots/ +``` + +The run drops results under `$RAPIDS_DATASET_ROOT_DIR//result/{build,search}/...` as JSON + auto-exported CSV. The plot is one PNG per dataset. + +### Step 7 — *(Optional validation)* Compare against the real dataset + +If you *also* have the real dataset at the same `total_rows`, you can validate that the synthetic-data benchmark numbers are a faithful stand-in for the real ones. If the synthetic dataset is built with the right parameters, both curves should overlap closely across the algorithms you ran. Repeat Steps 5–6 with the real dataset, then compare. + +> **Use random-jitter queries on the real dataset too.** The synthetic dataset's queries are random-jitter by construction (sampled from the data and perturbed with a small Gaussian — see [Query generation](#query-generation-random-jitter)), so the synthetic benchmark already measures index behavior under a *data-query distribution* where queries sit on the data manifold. To make the real-vs-synth comparison apples-to-apples, the real dataset's queries should follow the same scheme — otherwise you'd be comparing two different query distributions on top of two different data distributions, and any gap in the curves could be either effect. Generate them with: +> +> ```bash +> python -m cuvs_bench.generate_groundtruth \ +> /path/to/your_real_data.fbin \ +> --queries=random-jitter \ +> --n_queries=10000 \ +> --k=10 \ +> --output=$RAPIDS_DATASET_ROOT_DIR/my_real_data +> ``` +> +> This writes `queries.fbin`, `groundtruth.neighbors.ibin`, and `groundtruth.distances.fbin` into the real dataset's directory; reference those as `query_file` / `groundtruth_*_file` in the YAML below. +> +> If the real dataset already ships with a fixed benchmark query set you specifically want to evaluate on, you can use that instead — but the comparison is then no longer a clean test of the data-distribution match alone, so you'll likely need to **re-tune the synthetic fingerprint** (loop back to Step 1). Do the tuning *at parity* — generate the synthetic dataset at the same `total_rows` as the real dataset, plot real-vs-synth, and adjust the knobs until the curves overlap before scaling `--total_rows` up. This is the same loop the [Sanity check before scaling up](#sanity-check-before-scaling-up) paragraph below describes. + +#### Sanity check before scaling up + +For your first comparison run, generate the synthetic dataset *at parity* — i.e. at the **same `total_rows` as the real dataset** (set `--total_rows` in Step 4 to the real row count, so SF=100 still holds for your fitting sample). At parity the only difference between the two benchmark runs is real-vs-synth data, not size-vs-size, so any gap between the Pareto curves is cleanly attributable to the fingerprint. If the curves don't match at parity, increasing `total_rows` won't fix it — go back to Step 1 and revisit the fingerprint knobs (`ncomp`, `n_clusters`, `sample_size`). Once they match at parity, scale `--total_rows` up freely. + + +# Experiments + +We plan to ship pre-fit fingerprints for three commonly-benchmarked datasets — **Falcon** (960M x 1024d), **BigANN** (1B x 128d), and **Wiki** (88M x 768d). Each will be a single `.npz` you can pass straight to Step 4 (`--fingerprint .npz`) and skip Steps 1–3. The experiments below were produced with those fingerprints. + +In the plots below, **SS** = *sample size* (rows from the real dataset used to fit the fingerprint in Steps 1–3) and **TS** = *target size* (`--total_rows` passed to Step 4, i.e. the number of synthetic rows generated). + + +### Falcon — DiskANN, synth vs real (960M, 1024d) + +![Falcon — DiskANN synth-vs-real Pareto](figures/diskann_falcon.png) + +*Setup:* synthetic dataset generated from `falcon` fingerprint using the command below, compared against the real Falcon 960M and subsets on the same DiskANN build/search sweep, k=10. + +```bash +python -m cuvs_bench.synthesize_dataset generate \ + --fingerprint path/to/falcon_10M_nc100000_ncomp32_fingerprint.npz \ + --total_rows 960000000 \ + --gt_mode exact \ + --output_dir synthetic_falcon_960M/ +``` + + +### BigANN — synth vs real (1B, 128d) + +![Falcon — DiskANN synth-vs-real Pareto](figures/diskann_bigann.png) + +*Setup:* synthetic dataset generated from `bigann` fingerprint using the command below, compared against the real BigANN 1B and subsets on the same build/search sweep, k=10. + +```bash +python -m cuvs_bench.synthesize_dataset generate \ + --fingerprint path/to/bigann_10M_nc100000_ncomp32_fingerprint.npz \ + --total_rows 1000000000 \ + --gt_mode exact \ + --output_dir synthetic_bigann_1B/ +``` + + +### Wiki — synth vs real (88M, 768d) + +![Falcon — DiskANN synth-vs-real Pareto](figures/diskann_wiki.png) + +*Setup:* synthetic dataset generated from `wiki` fingerprint using the command below, compared against the real Wiki 88M and subsets on the same build/search sweep, k=10. + +```bash +python -m cuvs_bench.synthesize_dataset generate \ + --fingerprint path/to/wiki_880K_nc8800_ncomp32_fingerprint.npz \ + --total_rows 88000000 \ + --gt_mode exact \ + --output_dir synthetic_wiki_88M/ +``` + + +## Caveats + +- The synthetic data is calibrated to reproduce **how a vector index behaves on your real data** (recall, throughput, build time), not the raw distance values themselves. Use it to compare *index configurations*, not as a substitute for distance-correctness tests. +- `ncomp` is the dominant parameter. Above the ceiling, recall degrades uniformly across the entire recall range. +- Queries are sampled from the synthetic distribution itself (with small noise added) + +## Glossary + +- **`nc`** — number of KMeans clusters. +- **`ncomp`** — PCA components per cluster (the rank of the per-cluster covariance approximation). +- **`PPC`** — points per cluster in the fitting sample (= `sample_size / nc`). Defaults assume PPC=100, which is the empirically validated sweet spot. +- **`SF`** — scale factor (= `total_rows / sample_size`). How much we "stretch" the small sample to fill the target dataset. + +## Python API + +The CLI is a thin wrapper. For programmatic use: + +```python +from cuvs_bench.synthesize_dataset import ( + Fingerprint, + fit_fingerprint, + save_fingerprint, + load_fingerprint, + generate_synthetic_dataset, + generate_queries, + compute_groundtruth_nprobe, + compute_groundtruth_exact, + verify_groundtruth, +) +``` + +See module docstrings for argument details. diff --git a/python/cuvs_bench/cuvs_bench/synthesize_dataset/__init__.py b/python/cuvs_bench/cuvs_bench/synthesize_dataset/__init__.py new file mode 100644 index 0000000000..34b92f6fc6 --- /dev/null +++ b/python/cuvs_bench/cuvs_bench/synthesize_dataset/__init__.py @@ -0,0 +1,42 @@ +# +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. +# SPDX-License-Identifier: Apache-2.0 +# + +from ._fingerprint import Fingerprint +from ._fit import fit_fingerprint +from ._generate import ( + gen_cluster_gpu, + generate_queries, + generate_synthetic_dataset, + generate_synthetic_dataset_to_file, + get_cluster_seed, + get_num_points_per_cluster, + resolve_norm_scheme, +) +from ._ground_truth import ( + compute_groundtruth_exact, + compute_groundtruth_nprobe, +) +from ._io import ( + load_fingerprint, + save_fingerprint, +) +from ._verify import verify_groundtruth + +__all__ = [ + "Fingerprint", + "load_fingerprint", + "compute_groundtruth_nprobe", + "compute_groundtruth_exact", + "fit_fingerprint", + "gen_cluster_gpu", + "generate_queries", + "generate_synthetic_dataset", + "generate_synthetic_dataset_to_file", + "get_cluster_seed", + "get_num_points_per_cluster", + "resolve_norm_scheme", + "save_fingerprint", + "verify_groundtruth", +] diff --git a/python/cuvs_bench/cuvs_bench/synthesize_dataset/__main__.py b/python/cuvs_bench/cuvs_bench/synthesize_dataset/__main__.py new file mode 100644 index 0000000000..da0f6e372d --- /dev/null +++ b/python/cuvs_bench/cuvs_bench/synthesize_dataset/__main__.py @@ -0,0 +1,399 @@ +# +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. +# SPDX-License-Identifier: Apache-2.0 +# +"""Command-line entry point for the synthesize_dataset module. + +- ``fit``: extract a cluster fingerprint (.npz) from a real dataset. +- ``generate``: turn a fingerprint into a ``base.fbin`` / ``queries.fbin`` / +``groundtruth.{neighbors,distances}.{ibin,fbin}`` bundle. +- ``verify``: validate the chosen ``nprobes``. + +Run ``python -m cuvs_bench.synthesize_dataset --help`` for help. +""" + +from __future__ import annotations + +import argparse +import os +import sys + +import numpy as np + +from ..generate_groundtruth.utils import write_bin +from ._fit import fit_fingerprint +from ._generate import ( + generate_queries, + generate_synthetic_dataset_to_file, + resolve_norm_scheme, +) +from ._ground_truth import ( + compute_groundtruth_nprobe, + compute_groundtruth_exact, +) +from ._io import ( + load_dataset, + load_fingerprint, + save_fingerprint, +) +from ._verify import verify_groundtruth + + +def _add_fit_parser(subparsers: argparse._SubParsersAction) -> None: + p = subparsers.add_parser( + "fit", + help="Extract a dataset fingerprint from a real dataset.", + description=( + "Run KMeans + per-cluster PCA on a sample of the real " + "dataset and save the resulting fingerprint as an NPZ file." + ), + ) + p.add_argument( + "--dataset", + required=True, + help="Path to the real dataset (.fbin, .npy, or .pkl).", + ) + p.add_argument( + "--output", + default=None, + help=( + "Path to write the fingerprint NPZ. Defaults to " + "'{dataset_stem}_nc{n_clusters}_ncomp{pca_components}" + "[_ss{sample_size}]_seed{seed}.npz' in the current " + "working directory. '_ss{sample_size}' is included only when " + "--sample_size is passed." + ), + ) + p.add_argument( + "--sample_size", + type=int, + default=None, + help=( + "Use only the FIRST `sample_size` rows of the input " + "(default: use entire input). No shuffling is performed, so " + "make sure the dataset's head-of-file slice is representative." + ), + ) + p.add_argument( + "--n_clusters", + type=int, + required=True, + help="Number of KMeans clusters.", + ) + p.add_argument( + "--pca_components", + type=int, + required=True, + help="Number of PCA components per cluster.", + ) + p.add_argument( + "--seed", + type=int, + default=42, + help="RNG seed for KMeans init and PCA (default: 42).", + ) + + +def _add_generate_parser(subparsers: argparse._SubParsersAction) -> None: + p = subparsers.add_parser( + "generate", + help="Materialize a synthetic dataset + queries + GT from a fingerprint.", + ) + p.add_argument( + "--fingerprint", required=True, help="Path to fingerprint NPZ." + ) + p.add_argument( + "--total_rows", + type=int, + required=True, + help="Number of synthetic vectors to generate.", + ) + p.add_argument( + "--n_queries", + type=int, + default=10_000, + help="Number of queries to sample (default: 10000).", + ) + p.add_argument( + "--k", + type=int, + default=10, + help="Number of nearest neighbors in the GT (default: 10).", + ) + p.add_argument( + "--nprobes", + type=int, + default=None, + help=( + "Clusters probed per query for nprobe GT. Must be smaller than " + "the number of clusters used to fit the fingerprint. " + "Defaults to 5%% of the fingerprint's cluster count unless specified when " + "--gt_mode=nprobe." + ), + ) + p.add_argument( + "--gt_mode", + choices=["nprobe", "exact"], + default="nprobe", + help=( + "GT computation mode. `nprobe` is the cheap default" + "; `exact` does an exact streaming brute force across all " + "clusters." + ), + ) + p.add_argument( + "--output_dir", + required=True, + help="Directory to write base.fbin, queries.fbin, groundtruth.*.", + ) + p.add_argument( + "--seed", + type=int, + default=42, + help="RNG seed for synthetic data and query sampling (default: 42).", + ) + + +def _add_verify_parser(subparsers: argparse._SubParsersAction) -> None: + p = subparsers.add_parser( + "verify", + help="Validate that nprobe GT matches exact GT.", + description=( + "Check whether the chosen nprobes (relative to the number of " + "clusters in the fingerprint) is sufficient to produce a " + "high-quality ground truth. Runs nprobe GT and exact GT on a " + "synthetic dataset and compares recall. Use a small --total_rows " + "for a quick sanity check before committing to " + "a full-scale generate run using ``generate''." + ), + ) + p.add_argument( + "--fingerprint", required=True, help="Path to fingerprint NPZ." + ) + p.add_argument( + "--total_rows", + type=int, + default=1_000_000, + help="Synthetic dataset size to verify against (default: 1M).", + ) + p.add_argument( + "--n_queries", + type=int, + default=10_000, + help="Number of queries (default: 10000).", + ) + p.add_argument( + "--k", + type=int, + default=10, + help="Number of nearest neighbors in the GT (default: 10).", + ) + p.add_argument( + "--nprobes", + type=int, + default=None, + help=( + "Clusters probed per query for the nprobe GT being verified. " + "Must be smaller than the number of clusters in the fingerprint. " + "Defaults to 5%% of the fingerprint's cluster count." + ), + ) + p.add_argument( + "--seed", + type=int, + default=42, + help="RNG seed for query sampling (default: 42).", + ) + + +def _cmd_fit(args: argparse.Namespace) -> int: + print(f"Loading dataset from {args.dataset}...") + data = load_dataset(args.dataset, sample_size=args.sample_size) + print(f"Loaded data shape: {data.shape}") + + fingerprint = fit_fingerprint( + data=data, + n_clusters=args.n_clusters, + pca_components=args.pca_components, + seed=args.seed, + ) + + output_path = args.output + if output_path is None: + stem = os.path.splitext(os.path.basename(args.dataset))[0] + ss_tag = "" if args.sample_size is None else f"_ss{args.sample_size}" + output_path = ( + f"{stem}_nc{args.n_clusters}_ncomp{args.pca_components}" + f"{ss_tag}_seed{args.seed}.npz" + ) + save_fingerprint(output_path, fingerprint) + print(f"Saved fingerprint to {output_path}") + return 0 + + +def _cmd_generate(args: argparse.Namespace) -> int: + print(f"Loading fingerprint from {args.fingerprint}...") + config = load_fingerprint(args.fingerprint, seed=args.seed) + if args.nprobes is not None and args.nprobes > config.nclusters: + sys.exit( + f"--nprobes ({args.nprobes}) must be <= the fingerprint's cluster " + f"count ({config.nclusters})." + ) + print( + f" {config.nclusters} clusters, {config.ncols} dims, " + f"ncomp={config.pca_n_components}, " + f"norm_scheme={resolve_norm_scheme(config)} " + ) + os.makedirs(args.output_dir, exist_ok=True) + + base_path = os.path.join(args.output_dir, "base.fbin") + generate_synthetic_dataset_to_file( + config=config, + total_points=args.total_rows, + output_path=base_path, + ) + + queries = generate_queries( + nqueries=args.n_queries, + total_rows=args.total_rows, + config=config, + ) + queries_path = os.path.join(args.output_dir, "queries.fbin") + write_bin(queries_path, queries) + + nprobes = args.nprobes + if args.gt_mode == "nprobe" and nprobes is None: + nprobes = max(1, round(config.nclusters * 0.05)) + print( + f" nprobes not set; using 5% of {config.nclusters} clusters = {nprobes}." + ) + + if args.gt_mode == "nprobe": + gt_idx, gt_dist, _ = compute_groundtruth_nprobe( + queries=queries, + total_rows=args.total_rows, + config=config, + k=args.k, + nprobes=nprobes, + ) + else: + gt_idx, gt_dist, _ = compute_groundtruth_exact( + queries=queries, + total_rows=args.total_rows, + config=config, + k=args.k, + ) + + gt_idx_path = os.path.join(args.output_dir, "groundtruth.neighbors.ibin") + gt_dist_path = os.path.join(args.output_dir, "groundtruth.distances.fbin") + write_bin(gt_idx_path, gt_idx.astype(np.uint32)) + write_bin(gt_dist_path, gt_dist.astype(np.float32)) + + print( + f"\nWrote synthetic dataset bundle to {args.output_dir}:\n" + f" base.fbin ({args.total_rows:,} x {config.ncols})\n" + f" queries.fbin ({args.n_queries} x {config.ncols})\n" + f" groundtruth.neighbors.ibin ({args.n_queries} x {args.k})\n" + f" groundtruth.distances.fbin ({args.n_queries} x {args.k})" + ) + return 0 + + +def _cmd_verify(args: argparse.Namespace) -> int: + print(f"Loading fingerprint from {args.fingerprint}...") + config = load_fingerprint(args.fingerprint, seed=args.seed) + if args.nprobes is not None and args.nprobes > config.nclusters: + sys.exit( + f"--nprobes ({args.nprobes}) must be <= the fingerprint's cluster " + f"count ({config.nclusters})." + ) + nprobes = args.nprobes + if nprobes is None: + nprobes = max(1, round(config.nclusters * 0.05)) + print( + f" nprobes not set; using 5% of {config.nclusters} clusters = {nprobes}." + ) + print( + f" {config.nclusters} clusters, {config.ncols} dims, " + f"norm_scheme={resolve_norm_scheme(config)}" + ) + + result = verify_groundtruth( + config=config, + total_rows=args.total_rows, + nqueries=args.n_queries, + k=args.k, + nprobes=nprobes, + ) + + print("\n" + "=" * 60) + print("Verification Result") + print("=" * 60) + print(f" Recall: {result['recall']:.4f}") + if result["recall"] >= 0.999: + print( + f"\n nprobes={nprobes} is safe at total_rows={args.total_rows:,}." + ) + else: + print( + f"\n nprobes={nprobes} drops recall below 0.999 -- " + f"consider increasing nprobes." + ) + return 0 + + +def main(argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser( + prog="python -m cuvs_bench.synthesize_dataset", + description=( + "Synthesize an ANN-benchmark dataset of arbitrary size from a " + "small real-data fingerprint. See README.md for the recommended " + "workflow." + ), + ) + subparsers = parser.add_subparsers(dest="command", required=True) + _add_fit_parser(subparsers) + _add_generate_parser(subparsers) + _add_verify_parser(subparsers) + + args = parser.parse_args(argv) + + if args.command == "fit": + if args.sample_size is not None and args.sample_size <= 0: + parser.error( + f"--sample_size must be > 0 when provided " + f"(got {args.sample_size})." + ) + if args.n_clusters <= 0: + parser.error(f"--n_clusters must be > 0 (got {args.n_clusters}).") + if args.pca_components <= 0: + parser.error( + f"--pca_components must be > 0 (got {args.pca_components})." + ) + elif args.command in ("generate", "verify"): + if args.total_rows <= 0: + parser.error(f"--total_rows must be > 0 (got {args.total_rows}).") + if args.n_queries <= 0: + parser.error(f"--n_queries must be > 0 (got {args.n_queries}).") + if args.n_queries > args.total_rows: + parser.error( + f"--n_queries ({args.n_queries}) must be <= --total_rows " + f"({args.total_rows}) for sampling without replacement." + ) + if args.k <= 0: + parser.error(f"--k must be > 0 (got {args.k}).") + if args.nprobes is not None and args.nprobes <= 0: + parser.error(f"--nprobes must be > 0 (got {args.nprobes}).") + + if args.command == "fit": + return _cmd_fit(args) + if args.command == "generate": + return _cmd_generate(args) + if args.command == "verify": + return _cmd_verify(args) + parser.print_help() + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/python/cuvs_bench/cuvs_bench/synthesize_dataset/_fingerprint.py b/python/cuvs_bench/cuvs_bench/synthesize_dataset/_fingerprint.py new file mode 100644 index 0000000000..b555c706fe --- /dev/null +++ b/python/cuvs_bench/cuvs_bench/synthesize_dataset/_fingerprint.py @@ -0,0 +1,91 @@ +# +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. +# SPDX-License-Identifier: Apache-2.0 +# +"""Fingerprint: in-memory representation of a fitted cluster fingerprint.""" + +from dataclasses import dataclass +from typing import List, Optional + +import numpy as np + + +@dataclass +class Fingerprint: + """In-memory representation of a fitted cluster fingerprint. + + Produced by ``cuvs_bench.synthesize_dataset.fit``. + + Parameters + ---------- + nclusters : int + Number of KMeans clusters fitted on the real-data sample. + ncols : int + Dimensionality of the data. + seed : int + Random seed used for deterministic per-cluster generation. + cluster_centers : np.ndarray, shape (nclusters, ncols) + Cluster centroids. + cluster_variances : np.ndarray, shape (nclusters, ncols) + Per-dimension variance per cluster, used for variance matching when + rescaling generated points. + cluster_densities : np.ndarray, shape (nclusters,) + Relative density per cluster. Normalized to sum to 1 in ``__post_init__``. + pca_components_list : list[Optional[np.ndarray]] + Per-cluster principal directions, each of shape ``(ncomp, ncols)``. + Entries may be ``None`` for clusters that fell back to diagonal-only + sampling because they had too few points to fit a rank-``ncomp`` PCA. + pca_explained_var_list : list[Optional[np.ndarray]] + Per-cluster variance along each principal direction, each of shape + ``(ncomp,)``. ``None`` for diagonal-fallback clusters. + pca_noise_var : np.ndarray, shape (nclusters,) + Residual noise variance per cluster. Used as the variance of an isotropic + Gaussian added to every generated point. + pca_n_components : int + The requested number of PCA components per cluster as specified at + fit time. + norm_quantiles : np.ndarray, optional + Per-cluster empirical inverse-CDF of the real vector norms, shape + ``(nclusters, 256)`` -- one 256-point quantile grid per cluster, + produced at fit time. Used by the ``"percentile"`` norm scheme to draw + a per-vector target norm via inverse-CDF sampling from that vector's + cluster grid, so each cluster reproduces its own real radial spread. + ``None`` for grid-less fingerprints (those fall back to ``"off"`` norm + rescaling at generate time. + norm_unit : bool + Set at fit time when the real vectors are essentially unit-norm. Generation + L2-normalizes instead of inverse-CDF sampling using norm_quantiles. + """ + + nclusters: int + ncols: int + seed: int + cluster_centers: np.ndarray + cluster_variances: np.ndarray + cluster_densities: np.ndarray + + pca_components_list: List[Optional[np.ndarray]] + pca_explained_var_list: List[Optional[np.ndarray]] + pca_noise_var: np.ndarray + pca_n_components: int + + norm_quantiles: Optional[np.ndarray] = None + norm_unit: bool = False + + def __post_init__(self): + if self.cluster_densities.shape != (self.nclusters,): + raise ValueError( + f"cluster_densities shape {self.cluster_densities.shape} does " + f"not match expected ({self.nclusters},)." + ) + if not np.isfinite(self.cluster_densities).all(): + raise ValueError( + "cluster_densities must contain only finite values " + "(no NaN/Inf)." + ) + if (self.cluster_densities < 0).any(): + raise ValueError("cluster_densities must be non-negative.") + total = float(self.cluster_densities.sum()) + if total <= 0.0: + raise ValueError("cluster_densities must sum to a positive value") + self.cluster_densities = self.cluster_densities / total diff --git a/python/cuvs_bench/cuvs_bench/synthesize_dataset/_fit.py b/python/cuvs_bench/cuvs_bench/synthesize_dataset/_fit.py new file mode 100644 index 0000000000..8fe86d57cc --- /dev/null +++ b/python/cuvs_bench/cuvs_bench/synthesize_dataset/_fit.py @@ -0,0 +1,223 @@ +# +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. +# SPDX-License-Identifier: Apache-2.0 +# +"""Fit a cluster fingerprint from a sample of real data. + +Pipeline: +1. Run KMeans (``cuvs.cluster.kmeans``) on the sample. +2. For each cluster, fit a rank-``ncomp`` PCA (``cuvs.preprocessing.pca``) on + the cluster's residuals, capturing the dominant intra-cluster correlations. +3. Estimate a residual noise variance per cluster (the variance not captured + by the top-``ncomp`` components). +4. Compute a per-cluster norm inverse-CDF (quantile grid) if needed, which is stored as + ``norm_quantiles`` and used by the "percentile" norm scheme at generate time. + +The output dict can be saved with :func:`save_fingerprint`. +""" + +from __future__ import annotations + +import time +from typing import Any, Dict + +import cupy as cp +import numpy as np +from cuvs.cluster import kmeans as cuvs_kmeans +from cuvs.preprocessing import pca as cuvs_pca +from tqdm import tqdm + +# Number of points in each per-cluster norm inverse-CDF quantile grid. +_NORM_QUANTILE_COUNT = 256 + +# Tolerance for checking if the real vectors are essentially unit-norm (mean ~1, tiny spread). +_NORM_UNIT_MEAN_TOL = 0.02 +_NORM_UNIT_CV_TOL = 0.05 + + +def _run_kmeans( + data: np.ndarray, + n_clusters: int, + seed: int, +): + """Run cuvs KMeans, returning ``(labels, centroids)`` as numpy arrays.""" + params = cuvs_kmeans.KMeansParams( + n_clusters=n_clusters, + init_size=len(data), + ) + + print( + f" Running cuvs KMeans (n_clusters={n_clusters}, " + f"init_size={len(data):,})..." + ) + + t0 = time.perf_counter() + centroids_out, _, _ = cuvs_kmeans.fit(params, data) + print(f" KMeans fit time: {time.perf_counter() - t0:.2f}s") + + centroids_gpu = cp.asarray(centroids_out) + data_gpu = cp.asarray(data, dtype=cp.float32) + labels_out, _ = cuvs_kmeans.predict(params, data_gpu, centroids_gpu) + + centroids = cp.asnumpy(centroids_gpu).astype(np.float32) + labels = cp.asnumpy(cp.asarray(labels_out)).astype(np.int64) + + del data_gpu, centroids_gpu + cp.get_default_memory_pool().free_all_blocks() + + return labels, centroids + + +def _fit_cluster_pca( + residuals: np.ndarray, n_components: int +) -> tuple[np.ndarray, np.ndarray]: + """Fit a single PCA via cuvs and return ``(components, explained_var)``.""" + residuals_gpu = cp.asarray(residuals, dtype=cp.float32) + params = cuvs_pca.Params(n_components=n_components, copy=True) + out = cuvs_pca.fit(params, residuals_gpu) + components = cp.asnumpy(cp.asarray(out.components)).astype(np.float32) + explained_var = cp.asnumpy(cp.asarray(out.explained_var)).astype( + np.float32 + ) + + del residuals_gpu, out + return components, explained_var + + +def fit_fingerprint( + data: np.ndarray, + n_clusters: int, + pca_components: int, + seed: int = 42, +) -> Dict[str, Any]: + """Fit a cluster fingerprint to a real dataset sample. + + Parameters + ---------- + data : np.ndarray, shape (n, d) + Real data to fit against. + n_clusters : int + Number of KMeans clusters. + pca_components : int + Number of principal directions per cluster. + seed : int + Random seed for KMeans initialization. + + Returns + ------- + dict with keys: + ``centroids`` : (n_clusters, d) float32 + ``densities`` : (n_clusters,) float64 + ``variances_per_dim`` : (n_clusters, d) float32 + ``pca_components_list`` : list of (ncomp, d) float32 arrays or None + ``pca_explained_var_list`` : list of (ncomp,) float32 arrays or None + ``pca_noise_var`` : (n_clusters,) float32 + ``pca_n_components`` : int (the requested ncomp) + ``norm_quantiles`` : (n_clusters, 256) float32 — per-cluster + empirical inverse-CDF of the real vector + norms. + """ + n_dim = data.shape[1] + data_sample = np.ascontiguousarray(data.astype(np.float32)) + + # Norm quantiles for the "percentile" norm scheme at generate time + quantile_levels = np.linspace(0.0, 1.0, _NORM_QUANTILE_COUNT) + norms_gpu = cp.linalg.norm(cp.asarray(data_sample), axis=1) + data_sample_norms = cp.asnumpy(norms_gpu).astype(np.float32) + norm_mean = float(norms_gpu.mean()) + norm_cv = float(norms_gpu.std() / max(norm_mean, 1e-12)) + global_norm_quantiles = cp.asnumpy( + cp.quantile(norms_gpu, cp.asarray(quantile_levels)) + ).astype(np.float32) + del norms_gpu + cp.get_default_memory_pool().free_all_blocks() + + norm_unit = ( + abs(norm_mean - 1.0) <= _NORM_UNIT_MEAN_TOL + and norm_cv <= _NORM_UNIT_CV_TOL + ) + if norm_unit: + print( + f" Real vector-norm distribution: mean={norm_mean:.4f}, " + f"CV={norm_cv:.4f} -> ~unit-norm; storing norm_unit flag " + ) + else: + print( + f" Real vector-norm distribution: mean={norm_mean:.4f}, " + f"CV={norm_cv:.4f} (stored as per-cluster " + f"{_NORM_QUANTILE_COUNT}-point quantile grids)." + ) + + labels, centroids = _run_kmeans(data_sample, n_clusters, seed) + + print( + f"Fitting per-cluster PCA (ncomp={pca_components}) for " + f"{n_clusters} clusters..." + ) + + densities = np.zeros(n_clusters, dtype=np.float64) + variances_per_dim = np.zeros((n_clusters, n_dim), dtype=np.float32) + pca_components_list: list = [] + pca_explained_var_list: list = [] + pca_noise_var = np.zeros(n_clusters, dtype=np.float32) + norm_quantiles = ( + None + if norm_unit + else np.empty((n_clusters, _NORM_QUANTILE_COUNT), dtype=np.float32) + ) + + for i in tqdm(range(n_clusters), desc="Per-cluster PCA"): + mask = labels == i + cluster_points = data_sample[mask] + n_points = len(cluster_points) + densities[i] = n_points / len(data_sample) + + if not norm_unit: + if n_points >= 1: + norm_quantiles[i] = np.quantile( + data_sample_norms[mask], quantile_levels + ).astype(np.float32) + else: + norm_quantiles[i] = global_norm_quantiles + + if n_points < pca_components + 1: + # Diagonal fallback: too few points for a rank-ncomp PCA. + if n_points > 1: + variances_per_dim[i] = np.var(cluster_points, axis=0) + else: + variances_per_dim[i] = np.full(n_dim, 0.01, dtype=np.float32) + pca_components_list.append(None) + pca_explained_var_list.append(None) + pca_noise_var[i] = float(np.mean(variances_per_dim[i])) + continue + + variances_per_dim[i] = np.var(cluster_points, axis=0) + + residuals = cluster_points - centroids[i] + n_components = min(pca_components, n_points - 1, n_dim) + components, explained_var = _fit_cluster_pca(residuals, n_components) + + pca_components_list.append(components) + pca_explained_var_list.append(explained_var) + + # Residual variance not captured by the top-ncomp components. + total_var = float(np.var(residuals)) + pca_noise_var[i] = max(total_var - float(explained_var.sum()), 1e-6) + + n_valid = sum(1 for c in pca_components_list if c is not None) + print( + f" {n_valid}/{n_clusters} clusters have a rank-ncomp PCA; " + f"{n_clusters - n_valid} fell back to diagonal (too few points)." + ) + + return { + "centroids": centroids.astype(np.float32), + "densities": densities, + "variances_per_dim": variances_per_dim.astype(np.float32), + "pca_components_list": pca_components_list, + "pca_explained_var_list": pca_explained_var_list, + "pca_noise_var": pca_noise_var.astype(np.float32), + "pca_n_components": int(pca_components), + "norm_quantiles": norm_quantiles, + "norm_unit": norm_unit, + } diff --git a/python/cuvs_bench/cuvs_bench/synthesize_dataset/_generate.py b/python/cuvs_bench/cuvs_bench/synthesize_dataset/_generate.py new file mode 100644 index 0000000000..3620f5469e --- /dev/null +++ b/python/cuvs_bench/cuvs_bench/synthesize_dataset/_generate.py @@ -0,0 +1,377 @@ +# +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. +# SPDX-License-Identifier: Apache-2.0 +# +"""Per-cluster Gaussian sampler with rank-``ncomp`` covariance. + +Each cluster ``c`` defines a Gaussian: + + x ~ centroid_c + components_c.T @ z + n, + z ~ N(0, diag(explained_var_c)) + n ~ N(0, noise_var_c * I) + +After projection-plus-noise, we rescale per-dimension so the empirical +variance of the generated batch matches the cluster's per-dimension target. +""" + +from __future__ import annotations + +import os +import threading +from typing import Union + +import cupy as cp +import numpy as np +from tqdm import tqdm + +from .._bin_format import write_bin_header +from ..generate_groundtruth.utils import add_jitter +from ._fingerprint import Fingerprint + +ArrayLike = Union[np.ndarray, cp.ndarray] + + +def get_cluster_seed(base_seed: int, cluster_id: int) -> int: + """Map ``(base_seed, cluster_id)`` to a deterministic per-cluster seed.""" + return base_seed * 1_000_000 + int(cluster_id) + + +def resolve_norm_scheme(config: Fingerprint) -> str: + """Norm-rescaling scheme for generation, derived from the fingerprint. + + - ``"unit"``: fit flagged the data as unit-norm -> L2-normalize. + - ``"percentile"``: fit stored a per-cluster norm-quantile grid -> each cluster + reproduces its own real radial spread via inverse-CDF sampling. + - ``"off"``: neither -> vectors keep their natural generated magnitude. + """ + if getattr(config, "norm_unit", False): + return "unit" + has_quantiles = getattr(config, "norm_quantiles", None) is not None + return "percentile" if has_quantiles else "off" + + +def _rescale_to_scheme( + points: cp.ndarray, + scheme: str, + config: Fingerprint, + cluster_id: int, + rng: cp.random.RandomState, +) -> cp.ndarray: + if scheme == "off" or points.shape[0] == 0: + return points + + cur = cp.maximum(cp.linalg.norm(points, axis=1, keepdims=True), 1e-8) + if scheme == "unit": + return points / cur + + # "percentile": draw each vector's target norm from its cluster's inverse-CDF. + quantiles = cp.asarray(config.norm_quantiles[cluster_id], dtype=cp.float32) + u = rng.random_sample(size=points.shape[0], dtype=cp.float32) + grid = cp.linspace(0.0, 1.0, len(quantiles), dtype=cp.float32) + target = cp.interp(u, grid, quantiles).astype(cp.float32)[:, None] + + return points * (target / cur) + + +def gen_cluster_gpu( + cluster_id: int, + n_points: int, + config: Fingerprint, + return_cupy: bool = True, +) -> ArrayLike: + """Generate ``n_points`` points for one cluster on the GPU.""" + center = cp.asarray(config.cluster_centers[cluster_id]) + seed = get_cluster_seed(config.seed, cluster_id) + rng = cp.random.RandomState(seed) + + components_host = config.pca_components_list[cluster_id] + explained_var_host = config.pca_explained_var_list[cluster_id] + noise_var = float(config.pca_noise_var[cluster_id]) + + if components_host is not None: + components = cp.asarray(components_host) + explained_var = cp.asarray(explained_var_host) + k = len(explained_var) + + z = rng.standard_normal( + size=(n_points, k), dtype=cp.float32 + ) * cp.sqrt(cp.maximum(explained_var, 0.0)) + projected = z @ components + + noise_std = float(cp.sqrt(max(noise_var, 0.0))) + noise = ( + rng.standard_normal( + size=(n_points, config.ncols), dtype=cp.float32 + ) + * noise_std + ) + + centered = projected + noise + + # Per-dimension scaling for variance matching, applied only when we + # have enough points for a reliable empirical variance estimate. + if n_points >= 10: + target_var = cp.asarray(config.cluster_variances[cluster_id]) + actual_var = cp.var(centered, axis=0) + # Floor the actual variance at noise_var (the minimum expected + # variance) so we never divide by an unreliably small estimate. + scale = cp.sqrt(target_var / cp.maximum(actual_var, noise_var)) + # Cap the scale at 5x to prevent blowup from noisy estimates. + scale = cp.minimum(scale, 5.0) + centered = centered * scale + + points = (center + centered).astype(cp.float32) + else: + # Diagonal fallback (in-place for memory efficiency at scale). + scale = cp.sqrt(cp.asarray(config.cluster_variances[cluster_id])) + points = rng.standard_normal( + size=(n_points, config.ncols), dtype=cp.float32 + ) + points *= scale + points += center + + # rescale norm to match radial distribution + scheme = resolve_norm_scheme(config) + points = _rescale_to_scheme(points, scheme, config, cluster_id, rng) + + if return_cupy: + return points + return cp.asnumpy(points) + + +def get_num_points_per_cluster( + total_points: int, + config: Fingerprint, + min_points_per_cluster: int = 1, +) -> np.ndarray: + """Allocate ``total_points`` across clusters proportional to densities. + + Each cluster gets at least ``min_points_per_cluster``, then the remainder + is distributed by density. Remaining points are spread round-robin so + no single cluster absorbs the rounding error. + """ + nclusters = config.nclusters + min_required = nclusters * min_points_per_cluster + if total_points < min_required: + raise ValueError( + f"total_points={total_points} is less than " + f"nclusters * min_points_per_cluster = {min_required}. " + f"Increase total_points or decrease nclusters." + ) + + points_per_cluster = np.full( + nclusters, min_points_per_cluster, dtype=np.int64 + ) + remaining = total_points - min_required + if remaining > 0: + extra = (config.cluster_densities * remaining).astype(np.int64) + points_per_cluster += extra + + diff = total_points - int(points_per_cluster.sum()) + if diff != 0: + for i in range(abs(diff)): + c = i % nclusters + points_per_cluster[c] += 1 if diff > 0 else -1 + return points_per_cluster + + +def generate_synthetic_dataset( + config: Fingerprint, + total_points: int, +) -> np.ndarray: + """Materialize a full ``(total_points, ncols)`` synthetic dataset. + + For very large ``total_points``, prefer + :func:`generate_synthetic_dataset_to_file` to keep host RAM bounded. + """ + points_per_cluster = get_num_points_per_cluster(total_points, config) + + ncols = config.cluster_centers.shape[1] + result = np.empty((total_points, ncols), dtype=np.float32) + write_idx = 0 + + iterator = tqdm( + range(config.nclusters), desc="Generating synthetic dataset" + ) + + for cluster_id in iterator: + n_points = int(points_per_cluster[cluster_id]) + if n_points <= 0: + continue + cluster_points = gen_cluster_gpu( + cluster_id, + n_points, + config, + return_cupy=False, + ) + result[write_idx : write_idx + n_points] = cluster_points + write_idx += n_points + + cp.get_default_memory_pool().free_all_blocks() + + return result + + +def generate_synthetic_dataset_to_file( + config: Fingerprint, + total_points: int, + output_path: str, + batch_size: int = 1_000_000, +) -> None: + """Stream a synthetic dataset directly to a fbin file. + + Parameters + ---------- + config : Fingerprint + Fitted cluster fingerprint. + total_points : int + Number of synthetic vectors to generate. + output_path : str + Destination ``.fbin`` path. Parent directories are created as needed. + batch_size : int + Number of rows for a flush threshold. + """ + out_dir = os.path.dirname(output_path) + if out_dir: + os.makedirs(out_dir, exist_ok=True) + + points_per_cluster = get_num_points_per_cluster(total_points, config) + ncols = int(config.cluster_centers.shape[1]) + + # Buffer must be large enough to absorb the cluster that triggers a + # flush without overflowing + buf_capacity = batch_size + int(points_per_cluster.max()) + bufs = [ + np.empty((buf_capacity, ncols), dtype=np.float32) for _ in range(2) + ] + active_buf = 0 + buf_offset = 0 + rows_written = 0 + write_thread: threading.Thread | None = None + write_exception: Exception | None = None + + def _wait_for_write() -> None: + nonlocal write_thread, write_exception + if write_thread is not None: + write_thread.join() + write_thread = None + if write_exception is not None: + exc = write_exception + write_exception = None + raise exc + + def _write_buffer(f, buf_view: np.ndarray) -> None: + nonlocal write_exception + try: + buf_view.tofile(f) + except Exception as exc: + write_exception = exc + + def _flush_async(f, buf_view: np.ndarray) -> None: + nonlocal write_thread + write_thread = threading.Thread( + target=lambda: _write_buffer(f, buf_view), daemon=True + ) + write_thread.start() + + iterator = tqdm( + range(config.nclusters), + desc="Streaming synthetic dataset", + total=config.nclusters, + ) + + with open(output_path, "wb") as f: + write_bin_header(f, total_points, ncols) + + for cluster_id in iterator: + n_points = int(points_per_cluster[cluster_id]) + if n_points <= 0: + continue + + cluster_points = gen_cluster_gpu( + cluster_id, + n_points, + config, + return_cupy=False, + ) + + bufs[active_buf][buf_offset : buf_offset + n_points] = ( + cluster_points + ) + buf_offset += n_points + del cluster_points + + if buf_offset >= batch_size: + # Wait for the previous write to complete before reusing + # its buffer + _wait_for_write() + _flush_async(f, bufs[active_buf][:buf_offset]) + rows_written += buf_offset + active_buf = 1 - active_buf + buf_offset = 0 + cp.get_default_memory_pool().free_all_blocks() + + # wait for any in-flight write, then flush whatever's left in the active buffer. + _wait_for_write() + if buf_offset > 0: + bufs[active_buf][:buf_offset].tofile(f) + rows_written += buf_offset + + cp.get_default_memory_pool().free_all_blocks() + + if rows_written != total_points: + print( + f" Note: wrote {rows_written:,}/{total_points:,} rows " + f"(some clusters yielded zero points)." + ) + + +def generate_queries( + nqueries: int, + total_rows: int, + config: Fingerprint, + query_seed_offset: int = 999_999, +) -> np.ndarray: + """Sample ``nqueries`` query points from the synthetic distribution. + + For each query we pick a global index uniformly from ``range(total_rows)``, + determine which cluster it falls into, regenerate that cluster's points + deterministically, and grab the corresponding local index. Small Gaussian + noise is added at the end to avoid exact-match recall artefacts. + """ + scheme = resolve_norm_scheme(config) + points_per_cluster = get_num_points_per_cluster(total_rows, config) + cumsum = np.cumsum(points_per_cluster) + + rng = np.random.default_rng(config.seed + query_seed_offset) + global_indices = rng.choice(total_rows, size=nqueries, replace=False) + + # ``searchsorted(side='right')`` maps a global index to the cluster_id of + # the cluster whose cumulative range contains it. + cluster_ids = np.searchsorted(cumsum, global_indices, side="right") + + # Group queries by cluster so we generate each cluster only once. + cluster_to_queries: dict[int, list[tuple[int, int]]] = {} + for q_idx, (g_idx, c_id) in enumerate(zip(global_indices, cluster_ids)): + local_idx = g_idx if c_id == 0 else g_idx - cumsum[c_id - 1] + cluster_to_queries.setdefault(int(c_id), []).append( + (q_idx, int(local_idx)) + ) + + queries = np.zeros((nqueries, config.ncols), dtype=np.float32) + for c_id, qlist in tqdm( + cluster_to_queries.items(), + total=len(cluster_to_queries), + desc="Generating query points", + ): + n_points = int(points_per_cluster[c_id]) + cluster_points = gen_cluster_gpu( + c_id, + n_points, + config, + return_cupy=False, + ) + for q_idx, local_idx in qlist: + queries[q_idx] = cluster_points[local_idx] + + return add_jitter(queries, rng, normalize=(scheme == "unit")) diff --git a/python/cuvs_bench/cuvs_bench/synthesize_dataset/_ground_truth.py b/python/cuvs_bench/cuvs_bench/synthesize_dataset/_ground_truth.py new file mode 100644 index 0000000000..7f41c3887c --- /dev/null +++ b/python/cuvs_bench/cuvs_bench/synthesize_dataset/_ground_truth.py @@ -0,0 +1,218 @@ +# +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. +# SPDX-License-Identifier: Apache-2.0 +# +"""Query generation and ground-truth computation for the synthetic dataset. + +Two GT modes: + +- :func:`compute_groundtruth_exact` checks every cluster, computes brute-force + k-NN against the queries, and merges results across clusters. +- :func:`compute_groundtruth_nprobe` only checks the ``nprobes`` nearest + clusters per query (IVF-style). +""" + +from __future__ import annotations + +import time +from typing import Tuple + +import cupy as cp +import numpy as np +from cuvs.neighbors import brute_force as cuvs_brute_force +from tqdm import tqdm + +from ._fingerprint import Fingerprint +from ._generate import gen_cluster_gpu, get_num_points_per_cluster + + +def _bf_knn_gpu( + database_gpu: cp.ndarray, + queries_gpu: cp.ndarray, + k: int, + metric: str = "sqeuclidean", +) -> Tuple[np.ndarray, np.ndarray]: + """Brute-force k-NN on GPU using cuvs.""" + index = cuvs_brute_force.build(database_gpu, metric=metric) + distances, indices = cuvs_brute_force.search(index, queries_gpu, k) + return indices.copy_to_host(), distances.copy_to_host() + + +def _find_nearby_clusters( + queries: np.ndarray, + config: Fingerprint, + nprobes: int, + metric: str = "sqeuclidean", +) -> np.ndarray: + """Return the ``nprobes`` nearest cluster ids for each query.""" + centers_gpu = cp.asarray(config.cluster_centers, dtype=cp.float32) + queries_gpu = cp.asarray(queries, dtype=cp.float32) + indices, _ = _bf_knn_gpu(centers_gpu, queries_gpu, nprobes, metric=metric) + return indices # (nqueries, nprobes) + + +def compute_groundtruth_exact( + queries: np.ndarray, + total_rows: int, + config: Fingerprint, + k: int, + metric: str = "sqeuclidean", +) -> Tuple[np.ndarray, np.ndarray, dict]: + """Exact brute-force GT, computed by streaming every cluster. + Each cluster is regenerated, brute-forced against the queries, and the + running top-k is merged. + """ + if total_rows <= 0: + raise ValueError(f"total_rows must be > 0, got {total_rows}.") + if not 1 <= k <= total_rows: + raise ValueError( + f"k must be in [1, total_rows]; got k={k}, total_rows={total_rows}. " + ) + nqueries = len(queries) + points_per_cluster = get_num_points_per_cluster(total_rows, config) + cumsum = np.cumsum(points_per_cluster) + + gt_indices = np.full((nqueries, k), -1, dtype=np.int64) + gt_distances = np.full((nqueries, k), np.inf, dtype=np.float32) + + queries_gpu = cp.asarray(queries, dtype=cp.float32) + + total_gen_time = 0.0 + total_bf_time = 0.0 + total_merge_time = 0.0 + + iterator = tqdm(range(config.nclusters), desc="Exact GT (streaming)") + + for cluster_id in iterator: + n_points = int(points_per_cluster[cluster_id]) + global_start = 0 if cluster_id == 0 else int(cumsum[cluster_id - 1]) + + t0 = time.perf_counter() + cluster_gpu = gen_cluster_gpu( + cluster_id, n_points, config, return_cupy=True + ) + total_gen_time += time.perf_counter() - t0 + + t0 = time.perf_counter() + k_cluster = min(k, n_points) + local_indices, local_dists = _bf_knn_gpu( + cluster_gpu, queries_gpu, k_cluster, metric=metric + ) + total_bf_time += time.perf_counter() - t0 + + global_indices = local_indices.astype(np.int64) + global_start + + t0 = time.perf_counter() + merged_idx = np.concatenate([gt_indices, global_indices], axis=1) + merged_dist = np.concatenate([gt_distances, local_dists], axis=1) + order = np.argsort(merged_dist, axis=1)[:, :k] + gt_indices = np.take_along_axis(merged_idx, order, axis=1) + gt_distances = np.take_along_axis(merged_dist, order, axis=1) + total_merge_time += time.perf_counter() - t0 + + # Free cluster's points before the next iteration. + del cluster_gpu + + timing = { + "gen_time": total_gen_time, + "bf_time": total_bf_time, + "merge_time": total_merge_time, + } + return gt_indices, gt_distances, timing + + +def compute_groundtruth_nprobe( + queries: np.ndarray, + total_rows: int, + config: Fingerprint, + k: int, + nprobes: int, + metric: str = "sqeuclidean", +) -> Tuple[np.ndarray, np.ndarray, dict]: + """Cheap GT via cluster probing: only the ``nprobes`` nearest clusters + per query are regenerated and searched. + """ + if total_rows <= 0: + raise ValueError(f"total_rows must be > 0, got {total_rows}.") + if not 1 <= k <= total_rows: + raise ValueError( + f"k must be in [1, total_rows]; got k={k}, total_rows={total_rows}. " + ) + if not 1 <= nprobes <= config.nclusters: + raise ValueError( + f"nprobes must be in [1, nclusters]; got nprobes={nprobes}, " + f"nclusters={config.nclusters}." + ) + nqueries = len(queries) + points_per_cluster = get_num_points_per_cluster(total_rows, config) + cumsum = np.cumsum(points_per_cluster) + + t0 = time.perf_counter() + nearby = _find_nearby_clusters(queries, config, nprobes, metric=metric) + find_clusters_time = time.perf_counter() - t0 + + gt_indices = np.full((nqueries, k), -1, dtype=np.int64) + gt_distances = np.full((nqueries, k), np.inf, dtype=np.float32) + + # Reverse mapping: cluster_id -> queries that probe it. + t0 = time.perf_counter() + cluster_to_queries: dict[int, list[int]] = {} + for q_idx in range(nqueries): + for c_id in nearby[q_idx]: + cluster_to_queries.setdefault(int(c_id), []).append(q_idx) + mapping_time = time.perf_counter() - t0 + + total_gen_time = 0.0 + total_bf_time = 0.0 + total_merge_time = 0.0 + + iterator = tqdm( + cluster_to_queries.items(), + desc="Nprobe GT", + total=len(cluster_to_queries), + ) + + for cluster_id, q_list in iterator: + n_points = int(points_per_cluster[cluster_id]) + global_start = 0 if cluster_id == 0 else int(cumsum[cluster_id - 1]) + q_idx_arr = np.asarray(q_list, dtype=np.int64) + batch_queries = queries[q_idx_arr] + batch_queries_gpu = cp.asarray(batch_queries, dtype=cp.float32) + + t0 = time.perf_counter() + cluster_gpu = gen_cluster_gpu( + cluster_id, n_points, config, return_cupy=True + ) + total_gen_time += time.perf_counter() - t0 + + t0 = time.perf_counter() + k_cluster = min(k, n_points) + local_indices, local_dists = _bf_knn_gpu( + cluster_gpu, batch_queries_gpu, k_cluster, metric=metric + ) + total_bf_time += time.perf_counter() - t0 + + global_indices = local_indices.astype(np.int64) + global_start + + t0 = time.perf_counter() + cur_idx = gt_indices[q_idx_arr] + cur_dist = gt_distances[q_idx_arr] + merged_idx = np.concatenate([cur_idx, global_indices], axis=1) + merged_dist = np.concatenate([cur_dist, local_dists], axis=1) + order = np.argsort(merged_dist, axis=1)[:, :k] + gt_indices[q_idx_arr] = np.take_along_axis(merged_idx, order, axis=1) + gt_distances[q_idx_arr] = np.take_along_axis( + merged_dist, order, axis=1 + ) + total_merge_time += time.perf_counter() - t0 + + del cluster_gpu + + timing = { + "find_clusters_time": find_clusters_time, + "mapping_time": mapping_time, + "gen_time": total_gen_time, + "bf_time": total_bf_time, + "merge_time": total_merge_time, + } + return gt_indices, gt_distances, timing diff --git a/python/cuvs_bench/cuvs_bench/synthesize_dataset/_io.py b/python/cuvs_bench/cuvs_bench/synthesize_dataset/_io.py new file mode 100644 index 0000000000..6c1074a2ed --- /dev/null +++ b/python/cuvs_bench/cuvs_bench/synthesize_dataset/_io.py @@ -0,0 +1,203 @@ +# +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. +# SPDX-License-Identifier: Apache-2.0 +# +"""Dataset loading and fingerprint NPZ I/O. + +Supported dataset formats for :func:`load_dataset`: ``.fbin``, ``.npy``, +and ``.pkl``. + +Fingerprint NPZ file schema (written by :func:`save_fingerprint`, read by +:func:`load_fingerprint`): + +- ``centroids`` : float32, shape (nclusters, ncols) +- ``densities`` : float64, shape (nclusters,) +- ``variances_per_dim`` : float32, shape (nclusters, ncols) +- ``pca_components_arr`` : object array of length nclusters; entry i is + either a (ncomp, ncols) float32 array or an empty + float32 array if cluster i fell back to diagonal. +- ``pca_explained_var_arr`` : object array of length nclusters, parallel to + ``pca_components_arr``; each entry is a (ncomp,) + float32 array. +- ``pca_noise_var`` : float32, shape (nclusters,) +- ``pca_n_components`` : int, scalar (the requested ncomp; actual ncomp per + cluster may be smaller if it had too few points). +- ``norm_quantiles`` : float32, shape (nclusters, 256) — per-cluster + empirical inverse-CDF of the real vector norms. +""" + +from __future__ import annotations + +import os +import pickle +from typing import Any, Dict + +import numpy as np + +from ..generate_groundtruth.utils import memmap_bin_file +from ._fingerprint import Fingerprint + + +def _finalize_dataset(data: np.ndarray) -> np.ndarray: + """Validate a loaded dataset and return it as contiguous float32.""" + if data.ndim != 2: + raise ValueError( + f"dataset must be 2D (n_rows, n_cols); got ndim={data.ndim}, " + f"shape={data.shape}." + ) + if data.shape[1] == 0: + raise ValueError( + f"dataset must have n_cols > 0; got shape={data.shape}." + ) + return np.ascontiguousarray(data.astype(np.float32)) + + +def load_dataset( + path: str, + sample_size: int | None = None, + dtype: np.dtype = np.float32, +) -> np.ndarray: + """Load a real dataset for fitting. + + Supported formats: + - ``.fbin``: cuvs-bench binary header (``n_rows``, ``n_dim``; legacy + uint32 or extended uint64, auto-detected) followed by raw data. + - ``.npy``: standard numpy array file. + - ``.pkl``: pickled numpy array (or anything ``np.array`` can convert). + + Parameters + ---------- + path : str + Path to the dataset file. + sample_size : int or None + If given and smaller than the on-disk row count, only the **first** + ``sample_size`` rows are loaded (no shuffling). The caller is + responsible for ensuring the head-of-file slice is representative. + dtype : numpy dtype + Element dtype, used only for ``.fbin`` (the other formats carry their + own dtype). Defaults to float32. + + Returns + ------- + np.ndarray, shape (n, d) + """ + + if sample_size is not None and sample_size <= 0: + raise ValueError( + f"sample_size must be > 0 when provided, got {sample_size}." + ) + + ext = os.path.splitext(path)[1].lower() + + if ext == ".npy": + data = np.load(path) + if sample_size is not None and sample_size < len(data): + data = data[:sample_size] + return _finalize_dataset(data) + + if ext == ".pkl": + with open(path, "rb") as f: + data = pickle.load(f) + if not isinstance(data, np.ndarray): + data = np.array(data, dtype=np.float32) + if sample_size is not None and sample_size < len(data): + data = data[:sample_size] + return _finalize_dataset(data) + + # Default: treat as fbin (covers ".fbin" and unknown extensions). + # memmap_bin_file auto-detects the legacy uint32 / extended uint64 header. + mm = memmap_bin_file(path, dtype, mode="r") + if sample_size is not None: + mm = mm[:sample_size] + return _finalize_dataset(mm) + + +def save_fingerprint(filepath: str, fingerprint: Dict[str, Any]) -> None: + """Save a fitted cluster fingerprint to an NPZ file. + + Parameters + ---------- + filepath : str + Output path. Parent directories are created as needed. + fingerprint : dict + Dict with keys produced by :func:`fit_fingerprint`. + """ + out_dir = os.path.dirname(filepath) + if out_dir: + os.makedirs(out_dir, exist_ok=True) + + n_clusters = len(fingerprint["centroids"]) + + components_arr = np.empty(n_clusters, dtype=object) + explained_var_arr = np.empty(n_clusters, dtype=object) + + for i in range(n_clusters): + comp = fingerprint["pca_components_list"][i] + ev = fingerprint["pca_explained_var_list"][i] + if comp is not None: + components_arr[i] = comp + explained_var_arr[i] = ev + else: + # Diagonal-fallback marker: empty array. + components_arr[i] = np.array([], dtype=np.float32) + explained_var_arr[i] = np.array([], dtype=np.float32) + + to_save = dict( + centroids=fingerprint["centroids"], + densities=fingerprint["densities"], + variances_per_dim=fingerprint["variances_per_dim"], + pca_components_arr=components_arr, + pca_explained_var_arr=explained_var_arr, + pca_noise_var=fingerprint["pca_noise_var"], + pca_n_components=np.array([fingerprint["pca_n_components"]]), + norm_unit=np.array([bool(fingerprint.get("norm_unit", False))]), + ) + # Unit-norm fingerprints carry no quantile grids (generation L2-normalizes). + if fingerprint.get("norm_quantiles") is not None: + to_save["norm_quantiles"] = np.asarray( + fingerprint["norm_quantiles"], dtype=np.float32 + ) + np.savez(filepath, **to_save) + + +def load_fingerprint(filepath: str, seed: int) -> Fingerprint: + """Load a fitted cluster fingerprint from an NPZ file into a Fingerprint.""" + data = np.load(filepath, allow_pickle=True) + + n_clusters = len(data["centroids"]) + components_arr = data["pca_components_arr"] + explained_var_arr = data["pca_explained_var_arr"] + + pca_components_list = [] + pca_explained_var_list = [] + for i in range(n_clusters): + if len(components_arr[i]) > 0: + pca_components_list.append(components_arr[i]) + pca_explained_var_list.append(explained_var_arr[i]) + else: + pca_components_list.append(None) + pca_explained_var_list.append(None) + + centroids = data["centroids"] + norm_quantiles = ( + np.asarray(data["norm_quantiles"], dtype=np.float32) + if "norm_quantiles" in data.files + else None + ) + norm_unit = ( + bool(data["norm_unit"][0]) if "norm_unit" in data.files else False + ) + return Fingerprint( + nclusters=int(centroids.shape[0]), + ncols=int(centroids.shape[1]), + seed=seed, + cluster_centers=centroids, + cluster_variances=data["variances_per_dim"], + cluster_densities=data["densities"].astype(np.float64), + pca_components_list=pca_components_list, + pca_explained_var_list=pca_explained_var_list, + pca_noise_var=data["pca_noise_var"], + pca_n_components=int(data["pca_n_components"][0]), + norm_quantiles=norm_quantiles, + norm_unit=norm_unit, + ) diff --git a/python/cuvs_bench/cuvs_bench/synthesize_dataset/_verify.py b/python/cuvs_bench/cuvs_bench/synthesize_dataset/_verify.py new file mode 100644 index 0000000000..9846b95a5b --- /dev/null +++ b/python/cuvs_bench/cuvs_bench/synthesize_dataset/_verify.py @@ -0,0 +1,88 @@ +# +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. +# SPDX-License-Identifier: Apache-2.0 +# +"""Verify that the cluster-probing GT approximation is accurate enough.""" + +from __future__ import annotations + + +from ..backends._utils import compute_recall +from ._fingerprint import Fingerprint +from ._generate import generate_queries +from ._ground_truth import ( + compute_groundtruth_exact, + compute_groundtruth_nprobe, +) + + +def verify_groundtruth( + config: Fingerprint, + total_rows: int, + nqueries: int, + k: int, + nprobes: int, + metric: str = "sqeuclidean", +) -> dict: + """Compare nprobe GT vs exact GT. + + Parameters + ---------- + config : Fingerprint + total_rows : int + Synthetic dataset size to verify against. + nqueries : int + Number of queries to draw. + k : int + Number of neighbors. + nprobes : int + Number of clusters each query probes for the nprobe GT. + + Returns + ------- + dict with: + - ``recall``: recall of nprobe GT against exact GT + - ``nprobe_timing``, ``exact_gt_timing``: per-step timing breakdowns + """ + print( + f"Verifying nprobe GT against exact GT at " + f"total_rows={total_rows:,}, nqueries={nqueries}, " + f"k={k}, nprobes={nprobes}..." + ) + + queries = generate_queries( + nqueries=nqueries, + total_rows=total_rows, + config=config, + ) + + nprobe_idx, _, nprobe_timing = compute_groundtruth_nprobe( + queries=queries, + total_rows=total_rows, + config=config, + k=k, + nprobes=nprobes, + metric=metric, + ) + + exact_idx, _, exact_gt_timing = compute_groundtruth_exact( + queries=queries, + total_rows=total_rows, + config=config, + k=k, + metric=metric, + ) + + recall = compute_recall(nprobe_idx, exact_idx, k) + + print(f" Recall: {recall:.4f}") + + return { + "recall": recall, + "nprobe_timing": nprobe_timing, + "exact_gt_timing": exact_gt_timing, + "total_rows": total_rows, + "nqueries": nqueries, + "k": k, + "nprobes": nprobes, + } diff --git a/python/cuvs_bench/cuvs_bench/synthesize_dataset/figures/diskann_bigann.png b/python/cuvs_bench/cuvs_bench/synthesize_dataset/figures/diskann_bigann.png new file mode 100644 index 0000000000..351add0aa8 Binary files /dev/null and b/python/cuvs_bench/cuvs_bench/synthesize_dataset/figures/diskann_bigann.png differ diff --git a/python/cuvs_bench/cuvs_bench/synthesize_dataset/figures/diskann_falcon.png b/python/cuvs_bench/cuvs_bench/synthesize_dataset/figures/diskann_falcon.png new file mode 100644 index 0000000000..b051865567 Binary files /dev/null and b/python/cuvs_bench/cuvs_bench/synthesize_dataset/figures/diskann_falcon.png differ diff --git a/python/cuvs_bench/cuvs_bench/synthesize_dataset/figures/diskann_wiki.png b/python/cuvs_bench/cuvs_bench/synthesize_dataset/figures/diskann_wiki.png new file mode 100644 index 0000000000..7d3e16010c Binary files /dev/null and b/python/cuvs_bench/cuvs_bench/synthesize_dataset/figures/diskann_wiki.png differ diff --git a/python/cuvs_bench/cuvs_bench/synthesize_dataset/figures/pipeline.png b/python/cuvs_bench/cuvs_bench/synthesize_dataset/figures/pipeline.png new file mode 100644 index 0000000000..ed7fc995e6 Binary files /dev/null and b/python/cuvs_bench/cuvs_bench/synthesize_dataset/figures/pipeline.png differ