You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
External Lance vector index over parquet — RFC + tracking issue
Summary
This is the tracking issue for the External Lance Vector Index RFC and its integration into lance-spark-knn. The work extends #2 (per-query temp Lance write) with a second, complementary path: when R is a direct parquet/Delta scan, build a Lance IVF-PQ index directly over the source parquet files — no temp Lance dataset, no column copies, post-topK projection-only fetch from source.
R is a direct parquet/Delta scan — Lance reads source files directly
This issue scopes the third path — its lance-rs implementation, JNI surface, lance-spark integration, and benchmarks.
Headline numbers — uniform-pool cluster
Cluster benchmark on Spark 3.5 with 16 executors × 4 cores requested. The bench runs an executor CPU probe before timing (ExecutorCpuCheck) and reports per-executor median compute. The probe aborts the run when slowest/fastest > 1.25×, so the recorded numbers always come from a uniform pool. The numbers below were collected on a pool with 1.01× spread on both scales.
Configs:
B-narrow: per-query temp-Lance write + kNearestJoin, project rid only (apples-to-apples vs E)
C-indexed-narrow: Lance-native R with IVF-PQ index + kNearestJoin, project rid only
C-indexed-wide: Lance-native R with IVF-PQ index + kNearestJoin, project rid + 16 payload cols
E warm: IndexedNearestJoinExternal over the same parquet files, project rid, index already built
E build: one-time index build cost (separate from warm)
(Three configs answer: B − C-indexed = pure cost of per-query temp-Lance write; E vs C-indexed = cost of staying in parquet vs migrating to Lance; E build amortization = how few queries before E pays for itself.)
CPU probe: 6 executors active, pool median 300 ms, slowest/fastest = 1.01× ✓
Config
Run 1
Run 2
Median
B-narrow
98,334 ms
107,227 ms
107,227 ms (~1.8 min)
C-indexed-narrow
8,358 ms
8,229 ms
8,358 ms
E warm
8,602 ms
8,454 ms
8,602 ms
E build (one-time)
—
—
103,806 ms (~1.7 min)
C-indexed pre-work
—
—
80,055 ms (Lance write + index build, outside timing)
E vs B-narrow: 12.5× speedup. E vs C-indexed: 1.03× slower — indistinguishable (both runs of each config separated by 1.5-1.7%).
Per-run consistency (uniform-pool)
Scale / config
Run 1 vs Run 2 spread
wide-medium B-narrow
18%
wide-medium C-indexed
25%
wide-medium E warm
2.7%
mega-medium B-narrow
8%
mega-medium C-indexed
1.5%
mega-medium E warm
1.7%
Sub-2.7% within-config variance is what makes the cross-config deltas trustworthy.
Stable findings
B-narrow's per-query cost scales linearly with |R|. ~14 sec at 1M rows → ~107 sec at 10M rows. Every query pays the cost of writing R to a temp Lance dataset; that cost is bandwidth-bound on shared scratch.
E warm's per-query cost is sub-linear in |R|. Per-query went from ~7 ms at wide-medium (|L|=100) to ~9 ms at mega-medium (|L|=1000). The IVF-probe + PQ-refinement loop is bounded by nprobes × partition_size + K × refine_factor, not by |R|.
E vs B-narrow is a stable 12-20× speedup across scales. The headline finding. External-index avoids the temp-Lance write, and that write is what dominates B's cost.
E vs C-indexed depends on scale. At wide-medium E is ~1.76× faster (the smaller per-query work makes Path A's stage overhead and shuffle visible). At mega-medium they're indistinguishable. Either way, you don't pay a perf penalty for staying in parquet.
E build amortizes essentially immediately. At mega scale, build (~104 sec) ≈ one B-narrow query (~107 sec). For workloads that query R more than twice, E wins on total cost vs B-narrow even after counting the build.
Why E is slightly faster than C-indexed at smaller scale
Architectural — not because parquet random access beats Lance random access, but because Path A and Path C have different Spark plan shapes:
Path A (C-indexed) has 3-4 Spark stages: probe → shuffle → merge → materialize. Each stage boundary is ~50-200 ms of scheduler overhead. Total: ~200-600 ms in stage overhead.
Path C (E) has 1 fused stage. Zero stage-boundary overhead.
Path A's shuffle serializes (leftId, ScoredRowRef) for K × probeParallelism × |L| refs through Spark's network stack. Path C keeps refs in-JVM.
Path A runs probe on ~5-10 tasks (= number of Lance fragments). Path C runs on the source-size-driven heuristic, ~60 tasks for |L|=1000 — more concurrent JNI threads.
At wide-medium where total per-query work is small (~7 ms), these constant-cost differences are visible (~500 ms). At mega-medium where total per-query work is ~9 ms × 1000 queries = ~9 sec, the constants are noise.
What the comparison answers
"Is staying in parquet competitive with migrating to Lance?"
Yes. At wide-medium E is faster; at mega-medium they tie. Migration cost is real (~80 sec pre-work + ~5 GB extra storage at mega scale) and is the only reason to migrate.
"Is external-index meaningfully faster than per-query temp-Lance write?"
Yes — 12-20× faster across scales, stable on a clean cluster, growing with |R|. This is the load-bearing finding that justifies external-index for parquet-source workflows.
"Can the E build be distributed?"
Possible but not in Phase 1. Today's driver-side build at mega is ~104 sec; distributed build (broadcast centroids/codebook, executors process partial indexes, driver merges) would estimate ~30-40 sec. Worth doing for very large R; not load-bearing because build amortizes after 1-2 queries already. Phase 4 candidate.
Data sizes / stages / tasks per scale
Scale
|R|
R-vec
R-payload
Total parquet
External index file
Per-query bytes touched (E)
wide-medium
1M
500 MB
1 GB
~1.5 GB
~24 MB
~1 MB
mega-medium
10M
5 GB
10 GB
~15 GB
~250 MB
~10 MB
Stage / task shape from diagnostic prints in run logs (mega-medium, latest clean run):
probe: 5-10 tasks (= number of Lance fragments); shuffle: 128 tasks; merge + materialize: 128 tasks
E warm
1 stage (no shuffle)
60 tasks (left's natural partition count for |L|=1000 with the source-size-driven parallelism heuristic)
E is the simplest pipeline by a wide margin. Most of B's cost is the multi-stage temp-Lance write; C's cost is index-aware fragment scan; E's cost is Spark task scheduling overhead × 20 + per-task search work.
How E actually works
left.rdd (left rows, in user's partitioning)
└─ leftKeyed = zipWithUniqueId (RDD[(leftId, Row)])
└─ ExternalFusedStage.run per Spark task: open ExternalIvfPqIndex once
via JNI (cheap: mmap manifest + index header),
then for each left row call
idx.search(query, k, nprobes, refine_factor).
Lance returns the already-refined, already-
global top-K for that ONE query — internally
merging across all probed IVF partitions.
Within the same task, batch the surviving
(file_path, row_index) keys, call
idx.fetch_rows(keys, projection) ONCE per task,
assemble Spark Rows, emit.
Key properties:
Lance returns global top-K per query. Not partial top-K to be merged across Spark tasks; Lance's IVF probe handles the cross-partition merge inside its own search() call. From Spark's perspective, each call is "give me top-K for one query, don't worry about the index's internal sharding."
No R-side shuffle. Every Spark task reads from the same shared parquet files independently via Lance. The Path A shuffle (which exists to merge fragment-grouped probe contributions) is vestigial here — we removed it after benchmarking confirmed it was a passthrough.
Materialize is index-free.idx.fetch_rows(rowKeys, projection) does one page-index-aware parquet read per distinct file and reorders to caller order. Bounded by top_K × |L| × projection_cols, completely independent of |R| or row count.
Per-query work is single-threaded inside Lance. With nprobes=16 and refine_factor=8, one Lance call probes 16 IVF partitions, fetches K * refine_factor = 80 candidates, refines via parquet random access, returns top 10 — all on one CPU. Parallelism comes from Spark splitting left rows across many executors.
Parallelism tuning
The fused probe+materialize stage previously used left.rdd.partitions.size, which collapsed to 1-2 for typical KNN (|L|=100 → small left). Fixed by source-size-driven parallelism:
targetTasks = ceil(numL / TargetRowsPerTask)
capped at min(defaultParallelism, spark.sql.shuffle.partitions)
conditional repartition UP only
At wide-medium this brought E warm from 4,343 ms (1-2 tasks) to ~600 ms (32 tasks) — 7× speedup from getting parallelism right. Documented in code; there's no static "right number of tasks" because the right knob is workload-dependent (per-task work vs per-task overhead).
Why a second path
#2 shipped per-query temp-Lance write to handle non-Lance R. That path is general-purpose — works on any DataFrame including subqueries — but pays a per-query cost proportional to R's size (it copies all projected columns into a temp Lance dataset).
For the specific case of "R is a direct parquet/Delta scan", that copy is wasted work: parquet is the source of truth, and Lance can read parquet directly. The external-index path:
Builds a Lance IVF-PQ index over the source parquet files (Lance reads them; no temp write)
At query time, probes the IVF + refines via page-index-aware random access against the same source parquet
Post-topK materializes projection columns from source parquet for the surviving top-K rows only
The materialize win: temp-Lance copies all R rows × all projected columns; external-index reads top_K × |L| × projection_cols cells. For 1M-row R, top_K=10, |L|=100, 5 projection cols → external-index touches 5,000 cells instead of 5M × 5 = 25M.
Status — what shipped
lance-rs Phase 1 — ✅ end-to-end
New module at rust/lance/src/index/vector/external/:
fetch.rs — post-topK random row fetch with arbitrary projection
Plus pub fn write_ivf_pq_file_external(&ObjectStore, &Path, ...) in ivf.rs parallel to the existing write_ivf_pq_file_from_existing_index, with the &Dataset parameter dropped.
Tests: 14 unit tests + 1 integration test (tests/external_index_phase1.rs). Covers build → open → search recall above threshold → fetchRows projection in input order with duplicates → RowFilter exclusion. All passing.
Total: ~1,100 LoC + ~250 LoC tests in lance-rs.
lance-jni + Java surface — ✅
JNI module java/lance-jni/src/external_index.rs — nativeBuild, nativeOpen, nativeClose, nativeSearch, nativeFetchRows, plus accessors. RowFilter exposed as a packed LE byte[] of (file_id << 32) | row_index deleted rids — sidesteps cross-language callbacks while still supporting Delta DV / Iceberg position deletes.
Java classes under org.lance.index.external.*:
ExternalIvfPqIndex (handle, AutoCloseable)
ExternalIvfPqIndexParams (builder)
SearchResult
ParquetRowKey
The Java surface is independently usable from any JVM caller — Trino, Presto, a microservice, etc. — without any Spark dependency.
New files in lance-spark-knn_2.12. The integration ships two API shapes over the same external index:
Bulk join (the primary case for this RFC)
IndexedNearestJoinExternal.scala — public DataFrame API entry point: df.kNearestJoin(parquetCorpus, ...) distributes one probe per left row across executors
internal/ExternalIndexProbe.scala — Scala wrapper around the JNI handle, decodes Arrow IPC stream from fetchRows into Spark row maps
internal/ExternalIndexLifecycle.scala — driver-side cache (SHA-256-keyed by sorted file paths + vector column + params) + cleanup on SparkListenerApplicationEnd and JVM shutdown. Reuses the existing LanceTempLifecycle cleanup machinery.
internal/ExternalFusedStage.scala — fused probe + materialize stage with source-size-driven parallelism (targetTasks = ceil(numL / TargetRowsPerTask), capped at min(defaultParallelism, spark.sql.shuffle.partitions)). No shuffle.
Driver-side single-query API (RAG / point-lookup retrieval)
LanceParquetIndex.scala — SparkSession-aware Scala wrapper: build / buildIfMissing / open / search / searchToDF / fetchRowsToDF. Single probes run on the driver, not as a Spark stage.
The single-query path is a separate API shape because the bulk join is the wrong shape for it — kNearestJoin(1RowDf, parquetCorpus) would build a 1-partition queries DataFrame, ship one row to one executor, run the same probe loop, ship one row back. Spark task launch + stage submission + result fetch is ~100-300 ms; the driver-local JNI call is ~1-5 ms warm. Wrapping the single-query through Spark would be 30-100× slower than skipping Spark entirely.
// Example — driver-side single-query, returning a DataFrame for downstream pipeline useimplicitvals:SparkSession= spark
validx=LanceParquetIndex.buildIfMissing(spark, filePaths, vectorColumn ="vec")
try {
valtopK:DataFrame= idx.searchToDF(qvec, k =10, projection =Seq("doc_id", "title", "url"))
valenriched= topK.join(spark.table("metadata"), Seq("doc_id"))
} finally idx.close()
buildIfMissing shares the SHA-256-keyed cache with IndexedNearestJoinExternal: an index built earlier in the same Spark application by either path is automatically reused by the other.
For non-Spark JVM callers, the underlying org.lance.index.external.ExternalIvfPqIndex Java handle is independently usable — no org.apache.spark dependency needed. That covers the Trino / Presto / Java microservice / Jupyter shape.
SQL Catalyst integration (auto-routing df.kNearestJoin to Path C when R is a parquet/Delta scan) is deferred to Phase 3 of this issue — same staging the original kNearestJoin shipped in.
Test bench design
IndexedNearestJoinExternalBenchmark.scala (sibling to IndexedNearestJoinTempRBenchmark from #2). Runs all configs on the SAME data in the SAME job, no cross-run noise:
C-indexed-narrow / C-indexed-wide: Lance-native R + IVF-PQ index + kNearestJoin (apples-to-apples reference: R fully migrated to Lance with the same algorithm as E)
E: IndexedNearestJoinExternal over the same parquet files
F (skipped via flag): Spark MLlib BucketedRandomProjectionLSH — informally measured, strictly worse than every other config (algorithmic argument from approxSimilarityJoin's O(|R| × numHashTables) shuffle); not measured at mega scale to avoid burning cluster time on numbers that all read "LSH is much slower."
Scales:
Scale("wide-tiny", numR =100000, numL =100, dim =128, numPayloadCols =16),
Scale("wide-medium", numR =1000000, numL =100, dim =128, numPayloadCols =16),
Scale("wide-large", numR =1000000, numL =100, dim =128, numPayloadCols =64),
Scale("mega-medium", numR =10000000, numL =1000, dim =128, numPayloadCols =16),
Each payload col is 64 bytes UTF-8 — wide R has substantial column-copy cost.
Bench plumbing notes:
BENCH_CONFIGS env var (added in a7ae689) selects configs at runtime so a single binary supports B-only, C-only, BCE, etc.
cleanupSiblingScratchDirs helper sweeps knn-bench-data-* siblings before each run — without it, mega runs hit "Disk quota exceeded" on shared scratch volumes.
Cloud-storage (abfss) — Phase 1.6 follow-up
The numbers above are uniform-pool runs against file:// cluster scratch. When E runs against abfss:// source parquet on Databricks (DBR 16.4 LTS, 2 workers × 4 cores), the per-query refinement I/O dominates rather than the local NVMe path. We added a Phase 1.6 optimization layer on top of Phase 1.5 — same external-index API, same recall, no change to user code. Tracking on wide-tiny-l10 (|R|=100K, |L|=100, dim=128, 16 payload cols):
ParquetMetaCache (per-handle footer + page-index reuse across queries)
7,471 ms
-38%
Tuned coalesce gap to 4 MiB + dispatched PQ via compute_distances
7,201 ms
-40%
search_batch — per-task unioned refinement read
2,978 ms
-75%
local-fs reference for the same scale: ~1,200 ms. After the batch fix the abfss path is ~2.5× over local-fs (vs. ~10× before). The residual gap is now CPU-bound on probe+PQ, not I/O.
What changed in each step
CoalescingParquetReader (parquet_source.rs) — parquet-rs's default OBJECT_STORE_COALESCE_DEFAULT = 1 MiB and hardcoded 10-way fan-out are tuned for cluster-local NVMe. On abfss where each round-trip is 30-50 ms, we wrap ParquetObjectReader to merge ranges with a tunable gap and fetch with tunable concurrency.
ParquetMetaCache (parquet_source.rs + open.rs) — every per-query open_parquet_async was re-doing ObjectStore::from_uri + head + footer + page-index fetch (3 abfss round trips). Cache lives on OpenedExternalIndex and stores (ObjectStore, Path, file_size, ArrowReaderMetadata). On hit, ParquetRecordBatchStreamBuilder::new_with_metadata skips all three. Same shape as Spark's ParquetIOMetadataCache, DuckDB's parquet_metadata_cache, and parquet-rs's own new_with_metadata example.
SIMD PQ via compute_distances (search.rs) — replaced the per-row scalar scoring loop with ProductQuantizer::compute_distances, dispatching to the SIMD distance kernels in lance-index. Required transposing PQ codes from row-major (on-disk) to column-major before scoring; one transpose per partition is far cheaper than N scalar scoring loops.
search_batch (search.rs + JNI + Java + Scala) — the dominant fix. pub async fn search_batch(opened, queries, k, nprobes, refine_factor, filter) -> Vec<Vec<SearchResult>>. The fused Spark stage now collects the entire partition's left rows + queries first, then issues ONE batched probe call. Inside Lance, the batch path:
Probes + PQ-scores each query independently
Unions the candidate (file_path, row_in_file) pairs across all queries
Issues ONE refinement read per distinct file covering the unioned set
Looks each query's candidates up from the shared per-file buffer for exact distance + top-K
Per-task cost goes from N × (probe + pq + refine_io) to N × (probe + pq) + refine_io. With a 12-13-query/task batch, the I/O term collapses by ~12×.
extidx_search_batch n_queries=12 total=~2300ms
probe_pq ≈ 1800 ms (12 × ~150 ms — pure CPU, sequential within task)
union/topk ≈ 0 ms
refine_open ≈ 40 ms (one ParquetMetaCache miss per task, then hits)
refine_io ≈ 470 ms (ONE ~50 MB byte-range fetch covering all candidates)
refine_files=1 (one source parquet at this scale); candidate_pairs=960 (12 queries × 80 refine candidates). After dedup the per-file refinement read covers ~920 unique rows.
Why we stopped at -75%
Per-task probe+PQ is now 1.8 s of the 2.3 s batch wall-clock — CPU-bound, not I/O. Spark's task-per-core model already saturates all 8 cluster cores with 8 concurrent tasks, so intra-task parallelism (rayon across a batch's queries) would steal from the other 7 tasks. Further gains require more cluster cores, dropping to 4-bit PQ for better SIMD utilization, or a recall trade-off via lower nprobes/refine_factor. None of those are appropriate as default tunings; they're per-workload knobs.
lance-spark-knn_2.12/src/main/scala/org/lance/spark/knn/internal/ExternalFusedStage.scala — fusedPartition collects all queries before one batched probe call
New test: search_batch_matches_per_query_search asserts batched + per-query results agree exactly
Cross-compile to Linux for cluster
Cluster needs nativelib/linux-x86-64/liblance_jni.so but local builds only produce darwin-aarch64. Tried:
# One-time:
brew install zig
cargo install cargo-zigbuild
rustup target add x86_64-unknown-linux-gnu
# Per build:cd$LANCE_REPO/java/lance-jni
cargo zigbuild --target x86_64-unknown-linux-gnu --release
# Graft into the lance-core JAR before lance-spark fat JAR build:cd /tmp/jar-patch
mkdir -p nativelib/linux-x86-64
cp $LANCE_REPO/java/lance-jni/target/x86_64-unknown-linux-gnu/release/liblance_jni.so nativelib/linux-x86-64/
jar uf ~/.m2/repository/org/lance/lance-core/$VERSION/lance-core-$VERSION.jar nativelib/linux-x86-64/liblance_jni.so
This is a contributor-side workflow note; upstream lance-rs CI builds for all 3 architectures and publishes the multi-arch JAR.
Update (Phase 1.6):cross build --release --target x86_64-unknown-linux-gnu also works using the Ubuntu-based ghcr.io/cross-rs/x86_64-unknown-linux-gnu:main image (glibc 2.31, GCC 9.4) plus a small Cross.toml that pre-installs protoc 25.1, plus a [patch.crates-io] to a vendored copy of aws-lc-sys 0.41.0 with the gcc-bug-95189 self-check assertion stripped (the assert mis-fires under QEMU emulation on darwin-arm64; the actual crypto code is fine). Either path produces an equivalent linux .so.
Outstanding questions / known gaps
1. Recall
External-index uses Lance's existing IVF-PQ code path — same recall profile as the temp-Lance path (which uses the same IVF-PQ code on the temp dataset). The 17-20× speedup is at equal approximation quality, not by trading recall.
Realistic recall@10 with nprobes=16, refine_factor=8 on the bench config: 0.85-0.97 depending on data distribution. For exact recall=1.0, the RFC's Phase 2 IVF-FLAT path is the answer (stores full vectors per posting list; trade index size for exact distance).
2. Index invalidation / staleness handling — gap
Phase 1 manifest stores only (file_path, num_rows) per file. No content hash, no mtime, no size validation at open(). The driver-side cache keys on sorted file paths only — if file CONTENT changes but path stays the same, the stale cached index gets reused.
For in-process workloads (one Spark application = one cache scope), this is fine: rebuild fresh on the next application start. For persistent indexes across Spark sessions this becomes critical.
Phase 2 follow-up (~50 LoC):
// Manifest gains per-file fingerprint:structManifestFileEntry{file_path:String,num_rows:u64,size_bytes:u64,// NEWmtime_unix:u64,// NEWfooter_sha256:[u8;32],// NEW — parquet footer hash, not full content}// open() validates each file:// exists → size + mtime match → fast path// drift → recompute footer hash → fail with StaleIndex(file_path)
3. Tolerating data changes
Change
External-index behavior
Mitigation
Append (new parquet file)
New rows invisible to old index
Phase 4 append() or build delta index over new file + union results
Delete (Delta DV / Iceberg position deletes)
Already supported via RowFilter byte-array packed deleted rids
ExternalIvfPqIndex.packDeletedRids Java helper
In-place modify (Delta MERGE that rewrites file)
rids may silently shift; PQ codes stale
Must rebuild. Fingerprint check at open() (above) catches at open time.
Tiny modify (a few vectors updated, no row-shift)
Stale PQ codes on those rows; scores wrong but in ballpark
Acceptable for "approximate is fine" workloads
For production: pin index to a snapshot ID, rebuild when snapshot changes (cluster build is ~30 sec for 1M × dim=128, ~101 sec for 10M × dim=128), use RowFilter for delete-only changes.
4. SQL Catalyst integration
Phase 1 of this issue ships the DataFrame API entry point only (IndexedNearestJoinExternal.apply(...)). Catalyst rule integration — so users can write SELECT * FROM left LATERAL VIEW NEAREST(parquet_R, k) ... and the rule auto-routes to external-index — is Phase 3 work.
The same staging the original kNearestJoin shipped in: ship the DataFrame API first, prove the win, then promote to Catalyst with full plan-shape visibility.
5. Distributed E build — feasible, Phase 4
Today E build runs on the driver via a single JNI call:
ExternalIvfPqIndex::build(files, ...)
├─ ParquetVectorSource::sample(n) single thread, blocking I/O
├─ KMeans::new(training, k) multi-thread (rayon, within JVM)
├─ PQBuildParams::build(training, mt) multi-thread (rayon)
├─ for each batch in iter_batches():
│ IvfTransformer::with_pq.transform() single async runtime
│ shuffle_dataset.bin_by_partition() single async runtime
└─ write_ivf_pq_file_external() single writer
KMeans + PQ training are rayon-parallel within one JVM (~32 cores worth on the driver). The per-batch transform + shuffle is the sequential bottleneck.
Distributed shape (Phase 4):
Driver samples + trains KMeans + PQ codebook
Broadcast centroids + codebook to executors
Each executor reads its assigned parquet files, applies IvfTransformer, writes a partial index file
Driver merges partial indexes (concatenate posting lists per partition)
Realistic speedup at mega-medium: build at ~101 sec now → ~30-40 sec with distributed. Lance has the building blocks (distributed_indexing). Worth it for very large R; not load-bearing for Phase 1 because build amortizes after 1-2 queries already.
Estimated ~400 LoC in lance-rs (mostly merge logic) + ~200 LoC in lance-spark.
Implementation plan, by phase
Phase
Goal
Code (LoC)
Status
1
lance-rs external IVF-PQ over parquet, full build/open/search/fetch
~1,100 + ~250 tests
✅ done
1.5
JNI + Java surface + lance-spark integration (DataFrame API for join + driver-side single-query API)
Preferred home for IndexedNearestJoinExternal: alongside IndexedNearestJoin in lance-spark-knn_2.12 (current plan), or factor out shared probe/merge primitives into lance-spark-base so future engines reuse them?
Naming: IndexedNearestJoinExternal vs IndexedNearestJoinOverParquet vs kNearestJoinExternal. The "no temp write, source files are parquet" property is load-bearing.
External Lance vector index over parquet — RFC + tracking issue
Summary
This is the tracking issue for the External Lance Vector Index RFC and its integration into
lance-spark-knn. The work extends #2 (per-query temp Lance write) with a second, complementary path: when R is a direct parquet/Delta scan, build a Lance IVF-PQ index directly over the source parquet files — no temp Lance dataset, no column copies, post-topK projection-only fetch from source.Combined with #7 (full SPIP across all three paths), the lance-spark KNN story now covers all real-world R shapes:
This issue scopes the third path — its lance-rs implementation, JNI surface, lance-spark integration, and benchmarks.
Headline numbers — uniform-pool cluster
Cluster benchmark on Spark 3.5 with 16 executors × 4 cores requested. The bench runs an executor CPU probe before timing (
ExecutorCpuCheck) and reports per-executor median compute. The probe aborts the run whenslowest/fastest > 1.25×, so the recorded numbers always come from a uniform pool. The numbers below were collected on a pool with 1.01× spread on both scales.Configs:
kNearestJoin, projectridonly (apples-to-apples vs E)kNearestJoin, projectrid + 16 payload colskNearestJoin, projectridonlykNearestJoin, projectrid + 16 payload colsIndexedNearestJoinExternalover the same parquet files, projectrid, index already built(Three configs answer: B − C-indexed = pure cost of per-query temp-Lance write; E vs C-indexed = cost of staying in parquet vs migrating to Lance; E build amortization = how few queries before E pays for itself.)
Wide-medium (|R|=1M, dim=128, |L|=100, 16 string payload cols, ~1.5 GB)
CPU probe: 6 executors active, pool median 301 ms, slowest/fastest = 1.01× ✓
E vs B-narrow: 20.3× speedup. E vs C-indexed: 1.76× faster (689 ms vs 1,212 ms).
Mega-medium (|R|=10M, dim=128, |L|=1000, 16 payload cols, ~15 GB)
CPU probe: 6 executors active, pool median 300 ms, slowest/fastest = 1.01× ✓
E vs B-narrow: 12.5× speedup. E vs C-indexed: 1.03× slower — indistinguishable (both runs of each config separated by 1.5-1.7%).
Per-run consistency (uniform-pool)
Sub-2.7% within-config variance is what makes the cross-config deltas trustworthy.
Stable findings
nprobes × partition_size + K × refine_factor, not by|R|.Why E is slightly faster than C-indexed at smaller scale
Architectural — not because parquet random access beats Lance random access, but because Path A and Path C have different Spark plan shapes:
(leftId, ScoredRowRef)forK × probeParallelism × |L|refs through Spark's network stack. Path C keeps refs in-JVM.At wide-medium where total per-query work is small (~7 ms), these constant-cost differences are visible (~500 ms). At mega-medium where total per-query work is ~9 ms × 1000 queries = ~9 sec, the constants are noise.
What the comparison answers
Yes. At wide-medium E is faster; at mega-medium they tie. Migration cost is real (~80 sec pre-work + ~5 GB extra storage at mega scale) and is the only reason to migrate.
Yes — 12-20× faster across scales, stable on a clean cluster, growing with |R|. This is the load-bearing finding that justifies external-index for parquet-source workflows.
Possible but not in Phase 1. Today's driver-side build at mega is ~104 sec; distributed build (broadcast centroids/codebook, executors process partial indexes, driver merges) would estimate ~30-40 sec. Worth doing for very large R; not load-bearing because build amortizes after 1-2 queries already. Phase 4 candidate.
Data sizes / stages / tasks per scale
Stage / task shape from diagnostic prints in run logs (mega-medium, latest clean run):
spark.sql.shuffle.partitions); merge + materialize: 128 tasksE is the simplest pipeline by a wide margin. Most of B's cost is the multi-stage temp-Lance write; C's cost is index-aware fragment scan; E's cost is Spark task scheduling overhead × 20 + per-task search work.
How E actually works
Key properties:
search()call. From Spark's perspective, each call is "give me top-K for one query, don't worry about the index's internal sharding."idx.fetch_rows(rowKeys, projection)does one page-index-aware parquet read per distinct file and reorders to caller order. Bounded bytop_K × |L| × projection_cols, completely independent of|R|or row count.nprobes=16andrefine_factor=8, one Lance call probes 16 IVF partitions, fetchesK * refine_factor = 80candidates, refines via parquet random access, returns top 10 — all on one CPU. Parallelism comes from Spark splitting left rows across many executors.Parallelism tuning
The fused probe+materialize stage previously used
left.rdd.partitions.size, which collapsed to 1-2 for typical KNN (|L|=100→ small left). Fixed by source-size-driven parallelism:At wide-medium this brought E warm from 4,343 ms (1-2 tasks) to ~600 ms (32 tasks) — 7× speedup from getting parallelism right. Documented in code; there's no static "right number of tasks" because the right knob is workload-dependent (per-task work vs per-task overhead).
Why a second path
#2 shipped per-query temp-Lance write to handle non-Lance R. That path is general-purpose — works on any DataFrame including subqueries — but pays a per-query cost proportional to R's size (it copies all projected columns into a temp Lance dataset).
For the specific case of "R is a direct parquet/Delta scan", that copy is wasted work: parquet is the source of truth, and Lance can read parquet directly. The external-index path:
The materialize win: temp-Lance copies all R rows × all projected columns; external-index reads
top_K × |L| × projection_colscells. For 1M-row R, top_K=10, |L|=100, 5 projection cols → external-index touches 5,000 cells instead of 5M × 5 = 25M.Status — what shipped
lance-rs Phase 1 — ✅ end-to-end
New module at
rust/lance/src/index/vector/external/:mod.rs—ExternalIvfPqIndexhandle (build / open / search / fetch_rows)types.rs—ParquetFileSpec,SearchResult,ParquetRowKey,RowFiltertraitparams.rs—ExternalIvfPqIndexParamsbuilderparquet_source.rs—ParquetVectorSource(training sample + rid-annotated batch stream); also handlesList<Float32>from JVM parquet writers (Spark's parquet writer doesn't preserveFixedSizeList)manifest.rs— sidecarmanifest.jsonwith parquet file list + build paramsbuild.rs— orchestrates KMeans → PQ → IvfTransformer → shuffle → writeopen.rs— reads manifest + index file, decodes IVF + PQ from existing protobufsearch.rs— IVF probe + per-file parquet refinement; pluggableRowFilterfetch.rs— post-topK random row fetch with arbitrary projectionPlus
pub fn write_ivf_pq_file_external(&ObjectStore, &Path, ...)inivf.rsparallel to the existingwrite_ivf_pq_file_from_existing_index, with the&Datasetparameter dropped.Tests: 14 unit tests + 1 integration test (
tests/external_index_phase1.rs). Covers build → open → search recall above threshold → fetchRows projection in input order with duplicates → RowFilter exclusion. All passing.Total: ~1,100 LoC + ~250 LoC tests in lance-rs.
lance-jni + Java surface — ✅
JNI module
java/lance-jni/src/external_index.rs—nativeBuild,nativeOpen,nativeClose,nativeSearch,nativeFetchRows, plus accessors. RowFilter exposed as a packed LEbyte[]of(file_id << 32) | row_indexdeleted rids — sidesteps cross-language callbacks while still supporting Delta DV / Iceberg position deletes.Java classes under
org.lance.index.external.*:ExternalIvfPqIndex(handle, AutoCloseable)ExternalIvfPqIndexParams(builder)SearchResultParquetRowKeyThe Java surface is independently usable from any JVM caller — Trino, Presto, a microservice, etc. — without any Spark dependency.
5/5 JNI smoke tests pass (handle load, exception bridge, packed-rid round-trip, params builder).
lance-spark integration — ✅ additive to PR #2
New files in
lance-spark-knn_2.12. The integration ships two API shapes over the same external index:Bulk join (the primary case for this RFC)
IndexedNearestJoinExternal.scala— public DataFrame API entry point:df.kNearestJoin(parquetCorpus, ...)distributes one probe per left row across executorsinternal/ExternalIndexProbe.scala— Scala wrapper around the JNI handle, decodes Arrow IPC stream fromfetchRowsinto Spark row mapsinternal/ExternalIndexLifecycle.scala— driver-side cache (SHA-256-keyed by sorted file paths + vector column + params) + cleanup onSparkListenerApplicationEndand JVM shutdown. Reuses the existingLanceTempLifecyclecleanup machinery.internal/ExternalFusedStage.scala— fused probe + materialize stage with source-size-driven parallelism (targetTasks = ceil(numL / TargetRowsPerTask), capped atmin(defaultParallelism, spark.sql.shuffle.partitions)). No shuffle.IndexedNearestJoinExternalTest.scala— 16 left vectors × 2 parquet files × 320 rows each, top-K returned, scores monotone, oracle hit rate ≥ K/2.Driver-side single-query API (RAG / point-lookup retrieval)
LanceParquetIndex.scala—SparkSession-aware Scala wrapper:build/buildIfMissing/open/search/searchToDF/fetchRowsToDF. Single probes run on the driver, not as a Spark stage.LanceParquetIndexTest.scala— 5 tests covering JNI round-trip, DataFrame schema shape, projection materialization, fetchRows caller order, andbuildIfMissingcache reuse withIndexedNearestJoinExternal.The single-query path is a separate API shape because the bulk join is the wrong shape for it —
kNearestJoin(1RowDf, parquetCorpus)would build a 1-partition queries DataFrame, ship one row to one executor, run the same probe loop, ship one row back. Spark task launch + stage submission + result fetch is ~100-300 ms; the driver-local JNI call is ~1-5 ms warm. Wrapping the single-query through Spark would be 30-100× slower than skipping Spark entirely.buildIfMissingshares the SHA-256-keyed cache withIndexedNearestJoinExternal: an index built earlier in the same Spark application by either path is automatically reused by the other.For non-Spark JVM callers, the underlying
org.lance.index.external.ExternalIvfPqIndexJava handle is independently usable — noorg.apache.sparkdependency needed. That covers the Trino / Presto / Java microservice / Jupyter shape.SQL Catalyst integration (auto-routing
df.kNearestJointo Path C when R is a parquet/Delta scan) is deferred to Phase 3 of this issue — same staging the originalkNearestJoinshipped in.Test bench design
IndexedNearestJoinExternalBenchmark.scala(sibling toIndexedNearestJoinTempRBenchmarkfrom #2). Runs all configs on the SAME data in the SAME job, no cross-run noise:crossJoin + min_by_kbrute-force baseline (skipped on wide payload — pair count dominates regardless)kNearestJoin, projectridonlykNearestJoin, projectrid + N payloadkNearestJoin(apples-to-apples reference: R fully migrated to Lance with the same algorithm as E)IndexedNearestJoinExternalover the same parquet filesBucketedRandomProjectionLSH— informally measured, strictly worse than every other config (algorithmic argument fromapproxSimilarityJoin's O(|R| × numHashTables) shuffle); not measured at mega scale to avoid burning cluster time on numbers that all read "LSH is much slower."Scales:
Each payload col is 64 bytes UTF-8 — wide R has substantial column-copy cost.
Bench plumbing notes:
BENCH_CONFIGSenv var (added in a7ae689) selects configs at runtime so a single binary supports B-only, C-only, BCE, etc.cleanupSiblingScratchDirshelper sweepsknn-bench-data-*siblings before each run — without it, mega runs hit "Disk quota exceeded" on shared scratch volumes.Cloud-storage (abfss) — Phase 1.6 follow-up
The numbers above are uniform-pool runs against
file://cluster scratch. When E runs againstabfss://source parquet on Databricks (DBR 16.4 LTS, 2 workers × 4 cores), the per-query refinement I/O dominates rather than the local NVMe path. We added a Phase 1.6 optimization layer on top of Phase 1.5 — same external-index API, same recall, no change to user code. Tracking onwide-tiny-l10(|R|=100K, |L|=100, dim=128, 16 payload cols):CoalescingParquetReader(64 MiB gap, 32-parallelcoalesce_ranges)ParquetMetaCache(per-handle footer + page-index reuse across queries)compute_distancessearch_batch— per-task unioned refinement readlocal-fs reference for the same scale: ~1,200 ms. After the batch fix the abfss path is ~2.5× over local-fs (vs. ~10× before). The residual gap is now CPU-bound on probe+PQ, not I/O.
What changed in each step
CoalescingParquetReader (
parquet_source.rs) —parquet-rs's defaultOBJECT_STORE_COALESCE_DEFAULT = 1 MiBand hardcoded 10-way fan-out are tuned for cluster-local NVMe. On abfss where each round-trip is 30-50 ms, we wrapParquetObjectReaderto merge ranges with a tunable gap and fetch with tunable concurrency.ParquetMetaCache (
parquet_source.rs+open.rs) — every per-queryopen_parquet_asyncwas re-doingObjectStore::from_uri+head+ footer + page-index fetch (3 abfss round trips). Cache lives onOpenedExternalIndexand stores(ObjectStore, Path, file_size, ArrowReaderMetadata). On hit,ParquetRecordBatchStreamBuilder::new_with_metadataskips all three. Same shape as Spark'sParquetIOMetadataCache, DuckDB'sparquet_metadata_cache, and parquet-rs's ownnew_with_metadataexample.SIMD PQ via
compute_distances(search.rs) — replaced the per-row scalar scoring loop withProductQuantizer::compute_distances, dispatching to the SIMD distance kernels inlance-index. Required transposing PQ codes from row-major (on-disk) to column-major before scoring; one transpose per partition is far cheaper than N scalar scoring loops.search_batch(search.rs+ JNI + Java + Scala) — the dominant fix.pub async fn search_batch(opened, queries, k, nprobes, refine_factor, filter) -> Vec<Vec<SearchResult>>. The fused Spark stage now collects the entire partition's left rows + queries first, then issues ONE batched probe call. Inside Lance, the batch path:(file_path, row_in_file)pairs across all queriesPer-task cost goes from
N × (probe + pq + refine_io)toN × (probe + pq) + refine_io. With a 12-13-query/task batch, the I/O term collapses by ~12×.Per-batch breakdown (from
LANCE_LOG=lance::index::vector::external=infoexecutor stderr)refine_files=1(one source parquet at this scale);candidate_pairs=960(12 queries × 80 refine candidates). After dedup the per-file refinement read covers ~920 unique rows.Why we stopped at -75%
Per-task probe+PQ is now 1.8 s of the 2.3 s batch wall-clock — CPU-bound, not I/O. Spark's task-per-core model already saturates all 8 cluster cores with 8 concurrent tasks, so intra-task parallelism (rayon across a batch's queries) would steal from the other 7 tasks. Further gains require more cluster cores, dropping to 4-bit PQ for better SIMD utilization, or a recall trade-off via lower
nprobes/refine_factor. None of those are appropriate as default tunings; they're per-workload knobs.Files changed under Phase 1.6
rust/lance/src/index/vector/external/parquet_source.rs—CoalescingParquetReader,ParquetMetaCache,open_parquet_cachedrust/lance/src/index/vector/external/open.rs—OpenedExternalIndexowns the cache for its handle lifetimerust/lance/src/index/vector/external/search.rs—search_batch+ per-batchlog::info!timing diagnostics;searchreduced to thin wrapper oversearch_batch(&[query])rust/lance/src/index/vector/external/mod.rs—ExternalIvfPqIndex::search_batchpublic APIjava/lance-jni/src/external_index.rs—nativeSearchBatchjava/src/main/java/org/lance/index/external/ExternalIvfPqIndex.java—searchBatch(float[][], k, nprobes, refineFactor, deletedRids)lance-spark-knn_2.12/src/main/scala/org/lance/spark/knn/internal/ExternalIndexProbe.scala—probeBatchlance-spark-knn_2.12/src/main/scala/org/lance/spark/knn/internal/ExternalFusedStage.scala—fusedPartitioncollects all queries before one batched probe callsearch_batch_matches_per_query_searchasserts batched + per-query results agree exactlyCross-compile to Linux for cluster
Cluster needs
nativelib/linux-x86-64/liblance_jni.sobut local builds only producedarwin-aarch64. Tried:crosswith defaultghcr.io/cross-rs/x86_64-unknown-linux-gnu:main—aws-lc-sysbuild script blocklists GCC 6.3 due to https://gcc.gnu.org/bugzilla/show_bug.cgi?id=95189 (memcmp codegen bug)crosswith centos image (GCC 10) — Rust toolchain in container too old for workspace'sedition = "2024"cargo zigbuild --target x86_64-unknown-linux-gnu --release— uses host's rustup toolchain + zig-supplied glibc-targeted clang. Built clean. 158 MB linux .so.Workflow:
This is a contributor-side workflow note; upstream lance-rs CI builds for all 3 architectures and publishes the multi-arch JAR.
Update (Phase 1.6):
cross build --release --target x86_64-unknown-linux-gnualso works using the Ubuntu-basedghcr.io/cross-rs/x86_64-unknown-linux-gnu:mainimage (glibc 2.31, GCC 9.4) plus a smallCross.tomlthat pre-installs protoc 25.1, plus a[patch.crates-io]to a vendored copy ofaws-lc-sys0.41.0 with the gcc-bug-95189 self-check assertion stripped (the assert mis-fires under QEMU emulation on darwin-arm64; the actual crypto code is fine). Either path produces an equivalent linux .so.Outstanding questions / known gaps
1. Recall
External-index uses Lance's existing IVF-PQ code path — same recall profile as the temp-Lance path (which uses the same IVF-PQ code on the temp dataset). The 17-20× speedup is at equal approximation quality, not by trading recall.
Realistic recall@10 with
nprobes=16, refine_factor=8on the bench config: 0.85-0.97 depending on data distribution. For exactrecall=1.0, the RFC's Phase 2 IVF-FLAT path is the answer (stores full vectors per posting list; trade index size for exact distance).2. Index invalidation / staleness handling — gap
Phase 1 manifest stores only
(file_path, num_rows)per file. No content hash, no mtime, no size validation atopen(). The driver-side cache keys on sorted file paths only — if file CONTENT changes but path stays the same, the stale cached index gets reused.For in-process workloads (one Spark application = one cache scope), this is fine: rebuild fresh on the next application start. For persistent indexes across Spark sessions this becomes critical.
Phase 2 follow-up (~50 LoC):
3. Tolerating data changes
append()or build delta index over new file + union resultsRowFilterbyte-array packed deleted ridsExternalIvfPqIndex.packDeletedRidsJava helperopen()(above) catches at open time.For production: pin index to a snapshot ID, rebuild when snapshot changes (cluster build is ~30 sec for 1M × dim=128, ~101 sec for 10M × dim=128), use RowFilter for delete-only changes.
4. SQL Catalyst integration
Phase 1 of this issue ships the DataFrame API entry point only (
IndexedNearestJoinExternal.apply(...)). Catalyst rule integration — so users can writeSELECT * FROM left LATERAL VIEW NEAREST(parquet_R, k) ...and the rule auto-routes to external-index — is Phase 3 work.The same staging the original
kNearestJoinshipped in: ship the DataFrame API first, prove the win, then promote to Catalyst with full plan-shape visibility.5. Distributed E build — feasible, Phase 4
Today E build runs on the driver via a single JNI call:
KMeans + PQ training are rayon-parallel within one JVM (~32 cores worth on the driver). The per-batch transform + shuffle is the sequential bottleneck.
Distributed shape (Phase 4):
IvfTransformer, writes a partial index fileRealistic speedup at mega-medium: build at ~101 sec now → ~30-40 sec with distributed. Lance has the building blocks (distributed_indexing). Worth it for very large R; not load-bearing for Phase 1 because build amortizes after 1-2 queries already.
Estimated ~400 LoC in lance-rs (mostly merge logic) + ~200 LoC in lance-spark.
Implementation plan, by phase
append()for new parquet files without full rebuild;compact()when drift accumulates; distributed buildBranches / artifacts
external-index-rfc-draft(PR-equivalent diff in sezruby/lance)knn-external-index(PR #5)Asks
IndexedNearestJoinExternal: alongsideIndexedNearestJoininlance-spark-knn_2.12(current plan), or factor out shared probe/merge primitives intolance-spark-baseso future engines reuse them?IndexedNearestJoinExternalvsIndexedNearestJoinOverParquetvskNearestJoinExternal. The "no temp write, source files are parquet" property is load-bearing.lance-format/lanceupstream, or is a fork-first iteration preferred while design is reviewed?References