SPIP: Lance-backed approximate nearest-neighbor join for Apache Spark — three complementary paths
A proposal to add Lance-backed indexed NearestByJoin execution to lance-spark, covering all three real-world R-side shapes: data already in Lance, data in a generic Spark subplan, and data in direct parquet / Delta scans. Includes per-component justification for why the hot paths must live in Lance (Rust + SIMD), not in Spark JVM code.
📊 Measured results summary (added 2026-06-16)
Headline across every configuration we measured on a real cluster (Spark 3.5.4, Azure ABFSS storage) and locally: native Lance (Rust) search is the performance floor, and the less work done in the Spark JVM, the faster the join runs. Distribution only helps when single-machine search is genuinely too slow (unindexed brute-force at scale) — and even then the minimal-JVM mapPartitions shape beats the staged probe→shuffle→merge→materialize pipeline.
Single-query retrieval (M=1, driver-side native Dataset.newScan(...nearest), IVF-PQ)
| Dataset |
|R| |
dim |
p50 |
p90 |
p99 |
Cohere wikipedia-2023-11-embed-v3 |
10M |
1024 |
56 ms |
182 ms |
514 ms |
| Synthetic uniform (worst case for IVF) |
100M |
128 |
157 ms |
776 ms |
1,213 ms |
| Synthetic uniform |
250M |
128 |
1,084 ms |
1,528 ms |
2,323 ms |
Sub-200 ms p50 at 100M means distributing a single indexed query through Spark cannot win — Spark's task-scheduling floor (~100–300 ms warm) alone exceeds the single-machine time. Online retrieval should stay single-machine / native.
Batch KNN-join, indexed (synthetic 100M × dim=128, IVF-PQ, M=5000 queries, 8×4 cluster)
| Config |
Median |
vs SINGLE |
| SINGLE (driver loop) |
~490 s |
1.00× |
SPARK_DIST (mapPartitions → LanceProbe.probe, no shuffle) |
~42 s |
~13× faster |
SPARK (kNearestJoin staged pipeline, probeParallelism=1) |
~472 s |
~1.04× (break-even) |
At indexed scale the staged 3-stage pipeline never beat the dumb mapPartitions shape — its shuffle + merge + second-scan materialize is pure overhead because per-query Lance work is already tiny.
Batch KNN-join, no index (Cohere wiki 10M × dim=1024, brute-force, M=100, 8×4 cluster)
| Config |
Median (100 queries) |
vs SINGLE |
| SINGLE (driver loop) |
~50 min |
1.00× |
SPARK (kNearestJoin, probeParallelism=8) |
~19.4 min |
2.6× |
SPARK (kNearestJoin, probeParallelism=32) |
~13.9 min |
3.6× |
SPARK_DIST (mapPartitions) |
~5.0 min |
~10× faster |
Even where distribution genuinely pays (single-machine is 30 s/query brute-force), the minimal-JVM mapPartitions shape still beats the staged pipeline by ~2.8×. The staged pipeline's per-query fan-out (replicate L × fragments, shuffle, merge, materialize) creates M × N_frag × K intermediate JVM rows that don't otherwise need to exist.
Native batch-query API (lance-core #6828, shared flat-KNN scan)
Local (1M × dim=512, flat KNN): batched Scanner::nearest over M queries shares one scan/decode and gives ~1.5× at M=10, ~1.7–1.8× plateau at M≥100 vs M independent queries. The win is bounded by the point where per-query SIMD distance compute matches the saved I/O; it grows on slower storage where the shared scan is the expensive component.
Takeaway for design: keep the hot path in Lance Rust. Spark should be a thin fragment-dispatch + top-K-merge layer (à la PR lance-format/lance-spark#608); every Arrow→InternalRow→shuffle→InternalRow→Arrow round-trip in the JVM is measured overhead, not speedup.
Outline
- Q1: What are we trying to do?
- Q2: What this is NOT designed to solve
- Q3: How is it done today, and what are the limits?
- Q4: What is new in this approach?
- Q4.6: Beyond join — single-query and batched-query retrieval
- Q5: Why is each hot-path piece in Lance, not Spark?
- Q6: Who cares?
- Q7: Benchmarks
- Q8: Risks
- Q9: Implementation status
- Q10: Phasing
- Open questions
- References
Q1: What are we trying to do?
We want leftDf.kNearestJoin(rightDf, leftVec, rightVec, k, ...) in Spark to run fast and at production scale, regardless of what rightDf actually is. "Fast" means single-digit seconds at million-row R, not minutes. "Regardless" means: a Lance dataset, a parquet table, a Delta table, a wide-payload table, or an arbitrary upstream Spark subplan — the user shouldn't have to learn three different APIs.
Concretely, the proposal adds three execution paths backed by Lance, plus a routing layer that picks the right one:
| If R is... |
The path is... |
| A Lance dataset (with or without a vector index) |
Path A: Lance-native KNN (existing — #541 upstream) |
| A direct parquet or Delta scan |
Path C: external Lance vector index over the source files (new — #4 / PR #5) |
| Anything else (joins, projections, filters, computed columns, subqueries) |
Path B: per-query temp Lance materialization (new — #2 / PR #3) |
All three paths share the same probe-and-refine algorithm internally. The differences are about where the data lives and what migration cost the user can tolerate.
Beyond the bulk join, the same Lance primitives also power single-query retrieval — the RAG / "find the 10 nearest docs to this query" shape. The Rust core and JNI surface already accept a single query and return top-k; the Scala wrapper LanceParquetIndex (PR #5) adds a driver-side searchToDF so the result composes with the rest of a Spark pipeline without paying Spark's ~100-300 ms scheduler floor. Section 4.6 covers all three caller patterns: any-JVM service (no SparkSession), Spark-driver single-query, Spark-distributed batched queries.
The proposal also explains, component by component, why every piece of the hot path must live in Lance Rust (with SIMD distance kernels and page-index-aware parquet random access), not in Spark JVM code — and why a pure-Spark approach hits a wall well before production scale.
Q2: What this is NOT designed to solve
- Generic ANN library replacement. We reuse Lance's existing IVF-PQ implementation (with HNSW and IVF-FLAT as later phases). We don't propose new index types. Users who want HNSW-on-everything have other libraries.
- Real-time index updates. Builds happen offline or at query time; live
INSERT/UPDATE propagation into the index is a Phase 4 problem, not Phase 1.
- Distributed index build. Lance has a distributed indexing primitive; this proposal can be invoked from each Spark task, but coordination is the caller's job today. (Phase 4 candidate.)
- Data migration tooling. This proposal does NOT push users toward "rewrite everything as Lance." Path C explicitly lets users keep parquet as the source of truth.
- Cross-engine portability. This is a Spark integration. Velox / Gluten / DuckDB are separate efforts (Velox PR #16556, lance-c work) that this proposal complements but does not subsume.
Q3: How is it done today, and what are the limits?
3.1 Spark's brute-force crossJoin — the natural-but-wrong default
// What a Spark user writes today on parquet R
left.crossJoin(right)
.withColumn("__dist", l2Udf(col("lvec"), col("rvec")))
.groupBy("lid")
.agg(slice(sort_array(collect_list(struct("__dist", "rid")), asc = true), 1, k))
This is what Spark 4.2's RewriteNearestByJoin lowers to. The work it does:
- Cartesian product of
|L| × |R| rows. At |L|=1000, |R|=10M this is 10 billion (vector, vector) pairs.
- One UDF call per pair to compute L2 — 10B JVM-level distance evaluations.
- One groupBy + sort_array per left row. Memory pressure scales with
K × |L|.
For a real workload (|L|=1000, |R|=10M, dim=128), this is ~30+ minutes of cluster time at minimum, dominated by the 10B distance evaluations and the giant shuffle.
This is the baseline every Spark user hits today. It's correct, it's exact, and it's unusable past tens of millions of pairs.
3.2 Spark MLlib BucketedRandomProjectionLSH
The "obvious" approximate alternative built into Spark MLlib. Sketch:
val lsh = new BucketedRandomProjectionLSH().setBucketLength(2.0).setNumHashTables(5)
val model = lsh.fit(rightDfWithVecCol)
val pairs = model.approxSimilarityJoin(left, right, threshold = 1e9, distCol = "__dist")
// Then: groupBy lid, take top-K
Reading LSH.scala reveals what approxSimilarityJoin actually does:
val explodedA = processDataset(datasetA, ...) // |L| × numHashTables rows
val explodedB = processDataset(datasetB, ...) // |R| × numHashTables rows
val joined = explodedA.join(explodedB, "hashValue").drop(...).distinct()
val withDist = joined.withColumn("__dist", l2Udf(...))
withDist.filter("__dist < threshold")
The cost analysis:
- Explode:
|R| × numHashTables rows. At |R|=1M, numHashTables=5 that's 5M rows in explodedB.
- Hash join on
hashValue: a Spark shuffle join keyed on the LSH bucket. Number of (L, R) collision pairs scales with |L| × |R| / num_buckets, summed across hash tables.
- Distance UDF on every collision pair. Each is ~100s of nanoseconds in JIT-compiled Scala.
- Plus a
distinct() that costs another shuffle.
At our wide-medium config (|L|=100, |R|=1M, dim=128, numHashTables=5) this is 30-60 seconds per query dominated by the shuffle. At mega-medium (|R|=10M) it's minutes per query.
The fundamental problem isn't tuning. LSH was designed for streaming similarity joins — where you receive a continuous stream of pairs and want approximate threshold filtering. It is the wrong algorithm for the "given a query vector, return top-K from a static R" workload, which is exactly the kNN-by-join shape. For that workload, IVF-PQ visits a fixed nprobes × partition_size candidates regardless of |R| (sub-linear scaling), while LSH visits hash-bucket collisions that scale with |R| (linear at best, quadratic worst-case if buckets collapse).
We left LSH in the bench harness behind BENCH_SKIP_LSH=true for honesty, but skipped it from cluster runs as not informative — the comparison would just confirm "LSH is much slower."
3.3 The Spark-native KNN wall
A natural question is: "Can't we just write IVF-PQ probe + refine in Spark?" The honest answer is no — the wall is built from six independent costs, each of which alone is enough to disqualify the approach.
3.3.1 JVM row population overhead
Spark's row representation for FixedSizeList[Float] is UnsafeArrayData wrapping an UnsafeRow. Reading a single dim=128 vector means:
- 1 outer-row dereference
- 1 array-pointer dereference into off-heap UnsafeRow memory
- 128
getFloat(i) calls, each computing an offset and reading 4 bytes from UnsafeRow
- A
Vectors.dense(arr.toArray.map(_.toDouble)) conversion if calling MLlib (allocates a new double[] and a DenseVector object)
For a probe that touches K × refine_factor = 80 candidate vectors per query, that's 80 × 128 = 10,240 boxed reads + 80 fresh DenseVector allocations PER QUERY. At |L|=1000 queries × this work, it's ~10M boxed reads and ~80K allocations per query batch — and that's before doing any actual distance computation.
Lance, by contrast, reads the entire candidate set as a flat Float32Array (one allocation, contiguous memory) and feeds it directly to a SIMD distance kernel that processes 16 floats per AVX2 cycle. The throughput difference is roughly 50-100×.
3.3.2 Shuffle bandwidth for partition-grouping
If we tried to implement IVF probe in Spark, the natural shape would be:
left.rdd
├── FOR EACH left row, compute partition_id = nearest_centroid(query)
└── shuffle by partition_id, joining against R-partitioned-by-centroid
But R doesn't naturally partition by IVF centroid — you'd have to repartition R first, which means shuffling all of R's vector data once before any query runs. At wide-medium that's ~500 MB; at mega it's ~5 GB. This shuffle is per-query unless R is pre-shuffled into a persistent dataset — which is exactly what a Lance dataset already does internally for free.
In other words: doing IVF-PQ in Spark requires recreating Lance's on-disk partition layout via Spark shuffles. You'd duplicate Lance's storage layout work in shuffle code that doesn't survive across queries.
3.3.3 Distance kernel performance
A 128-dim L2 distance computation in optimal native code:
# Lance lance-linalg AVX2 path:
8× vsubps (16 floats × 4 floats per ymm × 2 for accumulator)
8× vfmadd231ps
1× horizontal sum
≈ 20-30 CPU cycles per pair on modern Skylake/Zen
A 128-dim L2 distance in JVM Scala with Math.fma UDF:
# JVM JIT'd path (assuming UDF gets JIT compiled):
128× scalar fmsub (no auto-vectorization for boxed floats)
128× boxed Float access (or unboxed if Tungsten kicks in, but the UDF wrapper doesn't help)
≈ 500-1000 CPU cycles per pair
Plus UDF call overhead per invocation (~50-200ns per call), Scala closure deserialization, codec inflation. Spark's Tungsten can vectorize some expression-level ops but it does NOT vectorize user-supplied UDFs over ArrayType.
At 80 candidate distances per query × 1000 queries (wide-medium), Spark's 50-100× per-distance penalty turns ~5 ms of native L2 into ~250-500 ms of JVM L2. That alone disqualifies a Spark-native refine path.
3.3.4 No page-index-aware parquet random access in Spark
Refinement needs to read 80 specific (file, row_index) pairs out of a 1M-row parquet file with no full scan. parquet-rs ≥58.3 supports this via PageIndexPolicy::Required + RowSelection::from_consecutive_ranges + the parquet column-and-page index. The parquet reader:
- Reads the page index out of the parquet footer
- For each requested row, finds the (row group, page) it lives in
- Issues a coalesced byte-range read for just those pages
- Decodes only the requested rows, not the surrounding 100K-row pages
parquet-mr (Spark's JVM parquet library) does NOT have this primitive. It supports row-group-level pruning (skip whole row groups based on column min/max) but not page-level random access. A "fetch 80 rows by row index" operation on parquet-mr decompresses entire pages, which means reading and decompressing ~100K rows to extract 1.
We'd need to add page-index-aware random access to parquet-mr to do refinement in Spark. That's a substantial parquet-mr contribution — and even if it lands, it's still JVM-level decompression + JVM row population (see 3.3.1) on the hot path.
3.3.5 No vector index file format in Spark
Spark has no concept of "an index file alongside a parquet table." There's no Catalyst plan node for "open this index file"; there's no DataSource v2 hook for "the table has an associated index"; there's no catalog metadata for index existence. We'd need to invent all of this in lance-spark code, or persuade upstream Spark to take a fundamental schema additions PR.
What we have today (Path C in this proposal) sidesteps all of that: the index file is a Lance artifact opened directly from the JNI handle, completely outside Catalyst's plan tree.
3.3.6 GC pressure under multi-query batches
The natural Spark code shape is "for each left row, run a probe, collect refs, then materialize." Each probe call allocates:
- A
double[] for the query vector
- An ArrayList of
(distance, rid) candidate tuples
- A sorted
ArrayList[K] for top-K
At |L|=1000 × per-query allocations of ~1-10 KB, it's tens of MB of short-lived object churn per query batch. Multiplied across the cluster, this triggers minor GC every few hundred ms in the executor JVMs and major GC every few seconds. Lance's Rust path has zero JVM heap allocation on the hot path — distance scoring is in-place on contiguous Vec<f32> buffers.
Combined: any single one of these costs is ~10× per-query overhead vs the Lance-native approach. Stacked together they're 100× to 1000× — easily the difference between a one-second per-query latency and a one-minute one. This is the wall.
Q4: What is new in this approach?
4.1 The three-path architecture
We don't propose a single hot-path implementation. We propose three implementations of the same logical operation, with a Catalyst rule choosing the right one based on what rightDf is:
leftDf.kNearestJoin(rightDf, ...)
│
├──┬─ rightDf is a Lance scan
│ └─ ▶ Path A: Lance-native KNN (existing)
│
├──┬─ rightDf is a direct parquet/Delta file scan (no joins, computed columns)
│ └─ ▶ Path C: external Lance vector index over those files
│
└──┬─ Anything else (subplans, filters, joins, projections)
└─ ▶ Path B: per-query temp Lance materialization, then Path A
The user writes the same API in every case. Routing is automatic via a Catalyst postHocResolutionRule. Three paths because R has three meaningfully different shapes in real Spark workloads, and a single hot-path for all of them necessarily compromises somewhere.
4.2 Path A: Lance-native KNN (R is already in Lance)
Reuses lance-spark's existing IndexedNearestJoin (upstream issue #541).
Hot path inside Lance:
- For each left vector, JNI call to
Dataset.scan().nearest(query, k, refine_factor).execute()
- Lance's IVF probe walks
nprobes partitions, scores PQ codes, keeps top K × refine_factor candidates
- Lance refines by reading actual vectors from the Lance dataset (Lance random access, not parquet) and computing exact L2 in lance-linalg SIMD
- Top-K returned across JNI as
(rowid, distance) pairs
Spark side (3 staged execs):
LanceProbeExec (open Lance dataset, run nearest()/refine() per left row, emit refs)
↓ Exchange(_leftId) ← Catalyst-inserted shuffle, AQE-engaged
LanceMergeExec (TopKHeap.merge across fragment-grouped contributions)
↓
LanceMaterializeExec (per task: open Lance dataset, point-fetch surviving rids, emit final Row)
The shuffle exists because Phase 1.5 supports probeParallelism > 1 (fragment-grouped probe), where multiple Spark tasks contribute partial top-K for the same left row. The merge stage's reduceByKey combines them.
When this path applies: R is already a Lance dataset. Optimal latency, no temp write, no extra storage.
4.3 Path B: per-query temp Lance materialization (R is an arbitrary subplan)
The general-purpose answer for "R is anything else." See #2 for the full motivation.
Hot path:
- Catalyst rule sees
kNearestJoin(left, right) and right doesn't unwrap to a Lance scan or a direct parquet scan
- Project
right to (rid := monotonically_increasing_id(), rvec, ...payload), write to a temp Lance dataset under spark.lance.knn.tempR.dir
- Proceed exactly as Path A against the temp Lance URI
- SparkListener cleans up the temp dir on
onApplicationEnd + JVM shutdown hook
When this path applies: any non-trivial R — parquet that has been filtered, joined, transformed, or computed; in-memory DataFrames; subqueries; SQL scenarios with views.
Cost: the temp Lance write is real. At |R|=1M, dim=128, narrow projection, it's ~15-20 sec on a cluster (shared object store + Lance encoding). At |R|=10M it's ~2.5 min. This cost dominates Path B, and is the load-bearing reason Path C exists for the parquet case.
4.4 Path C: external index over parquet (R is a direct parquet/Delta scan)
The new path. Lance builds an IVF-PQ index directly over the source parquet files, no temp Lance dataset, no column copies. Lance owns the parquet reader.
Hot path:
-
Build (once per logical "table version", amortized across queries):
- Lance reads the vector column from each parquet file via parquet-rs with
PageIndexPolicy::Required
- Trains kmeans + PQ codebook (lance-index existing APIs)
- Streams (vec, rid) batches through
IvfTransformer + shuffle_dataset
- Writes
index.idx (standard Lance IVF-PQ encoding) + manifest.json (parquet file list)
- rid encoding:
(file_id_u32 << 32) | row_index_u32
-
Search (per query):
- IVF probe → top
K × refine_factor candidates with PQ-approx scores
- Decode rids back to
(file_path, row_index)
- Per-file: open parquet with
PageIndexPolicy::Required, RowSelection::from_consecutive_ranges for the requested rows, read actual vectors
- Compute exact L2 in lance-linalg SIMD
- Return top-K as
Vec<SearchResult { file_path, row_index, distance }>
-
Materialize (post-topK fetch):
- Caller passes
(file_path, row_index) pairs + projection columns
- Lance batches by file, issues page-index-aware reads per file
- Returns Arrow IPC stream → JVM decodes to Spark Rows
Spark side (1 fused exec — no shuffle):
left.rdd
↓ ExternalFusedStage (per task: open external index, probe each left row,
batched fetch_rows for the partition, emit final Row)
Why no shuffle? Lance's idx.search() returns the already-merged global top-K for one query — the cross-partition merge happens inside Lance, not Spark. So the Path A shuffle (which exists to merge partial contributions from fragment-grouped probes) is vestigial here. The shuffle was inherited from the Path A pipeline; we removed it after benchmarking confirmed it was a passthrough.
When this path applies: R is spark.read.parquet(path) or spark.read.format("delta").load(path), possibly with a passthrough Project and/or supported Filter on top. Anything else (joins, computed columns, unsupported predicates) falls through to Path B.
4.5 The routing decision tree
Catalyst rule (postHocResolutionRule, opt-in via spark.lance.knn.indexedNearestByJoin.enabled):
NearestByJoin(L, R, approx=true, k, ranking, direction) seen in plan
│
├── unwrap R: SubqueryAlias / View / passthrough Project / supported Filter
│
├── R unwraps to Lance DSv2 relation
│ └── ▶ Path A (existing IndexedNearestJoin pipeline)
│
├── R unwraps to parquet/Delta LogicalRelation (Phase 3, not yet shipped)
│ ├── spark.lance.knn.externalIndex.enabled = true
│ │ └── ▶ Path C (IndexedNearestJoinExternal)
│ └── else fall through to Path B
│
├── spark.lance.knn.tempRForSqlRule.enabled = true (existing)
│ └── ▶ Path B (LanceTempR.materialize → Path A)
│
└── else: fall through to Spark's default crossJoin rewrite
User-visible: a single df.kNearestJoin(rightDf, ...) API. Three execution strategies under the hood.
4.6 Beyond join: single-query and batched-query retrieval
The same primitives that power the bulk join also serve a second use case that came up in feedback: single-query retrieval — one (or a small batch of) query vector(s) producing top-k rows. This is the RAG / search-bar / "nearest 10 docs to this prompt" shape. The bulk join is the wrong tool for it: shipping a 1-row DataFrame to executors, running one probe, shipping one row back is pure scheduler overhead.
The Rust core and JNI already accept a single query: ExternalIvfPqIndex::search(query, k, nprobes, refine_factor, filter) -> Vec<SearchResult>. PR #5 ships a thin Scala wrapper, LanceParquetIndex, that adapts the JNI surface to DataFrame-returning entry points so the result composes with downstream Spark transforms. Three caller patterns, in increasing order of "Spark-ness":
Pattern 1: any JVM caller, no Spark dependency
Index files are self-contained in object storage (manifest.json + index.idx). Any JVM process can open and search:
try (ExternalIvfPqIndex idx = ExternalIvfPqIndex.open(indexUri)) {
List<SearchResult> hits = idx.search(qvec, /*k*/ 10, /*nprobes*/ 16, /*refineFactor*/ 8, null);
byte[] arrowIpc = idx.fetchRows(
hits.stream().map(h -> ParquetRowKey.of(h.getFilePath(), h.getRowIndex())).toList(),
List.of("doc_id", "title", "url"));
// decode arrowIpc with ArrowStreamReader
}
No SparkSession, no org.apache.spark dependency on the classpath. Build the index once via Spark or an offline job, then serve queries from a Trino UDF, Presto plugin, a Java request-handler, a Jupyter kernel — anywhere the JVM runs. Per-query latency is bounded by IVF probe + 80 candidate refinement reads (~1-5 ms warm, ~50 ms cold-mmap).
This pattern is fully shipped today as part of Phase 1; no Spark wrapper is needed for it. We call it out in the SPIP because it's the load-bearing reason the index file format is engine-independent — Spark is the integration we're proposing here, but Lance's vector index over parquet is a primitive that other engines can reuse.
Pattern 2: Spark notebook / job, one query at a time
A user is in a Spark session (notebook, structured streaming sink, batch job) and wants top-k rows for a single query vector to feed into downstream Spark transforms (joins, projections, aggregations, UDFs). PR #5 ships LanceParquetIndex for this:
import org.lance.spark.knn.LanceParquetIndex
implicit val s: SparkSession = spark
val idx = LanceParquetIndex.buildIfMissing(
spark,
filePaths = Seq("/data/embeddings-0.parquet", "/data/embeddings-1.parquet"),
vectorColumn = "vec")
try {
// 10 nearest rows to qvec, payload cols materialized via Lance's parquet reader
val topK: DataFrame = idx.searchToDF(qvec, k = 10, projection = Seq("doc_id", "title", "url"))
// Composes with normal Spark transforms downstream:
val enriched = topK
.join(spark.table("metadata"), Seq("doc_id"))
.filter("category = 'tech'")
} finally idx.close()
Why driver-side, not a Spark stage. A single probe is ~1-5 ms warm (the JNI call + IVF probe + 80 candidate refinements + Arrow IPC encode). Wrapping that in a parallelize(Seq(qvec)).mapPartitions { idx.search(...) } would cost:
| Cost |
Magnitude |
| Spark task launch |
~50-200 ms |
| Stage submission overhead |
~50-100 ms |
| Result block fetch back to driver |
~10-50 ms |
| Spark scheduler floor |
~110-350 ms total |
versus ~1-5 ms for the driver-local call. The Spark wrapping isn't just "no benefit" — it's strictly slower by 30-100×. So LanceParquetIndex.search and searchToDF both run on the driver. The returned DataFrame has 1 partition (because k is bounded; nothing to parallelize), and from that point on it composes with the rest of the Spark plan — that's the only reason it's a DataFrame at all rather than a Seq.
Returned schema:
searchToDF form |
Schema |
searchToDF(qvec, k) (no projection) |
(file_path STRING, row_index LONG, score FLOAT) — keys + exact distance |
searchToDF(qvec, k, projection = Seq("doc_id", "title")) |
(doc_id ..., title ..., score FLOAT) — payload columns from parquet + score |
When projection is non-empty, the wrapper issues idx.fetchRows(keys, projection) after search and joins the payload into the row. The schema for the projection columns is read from the parquet footer (cached on first call). One Arrow IPC round-trip; no Spark shuffle.
Index lifecycle is shared with the bulk-join path. buildIfMissing keys the index file on sha256(sorted file paths + vector column + params). If the bulk-join path (IndexedNearestJoinExternal) already built an index over the same files in this Spark application, LanceParquetIndex.buildIfMissing returns immediately with the existing URI. Cleanup is application-scoped (SparkListenerApplicationEnd + JVM shutdown hook).
Cluster requirement. spark.lance.knn.externalIndex.dir must point at a shared filesystem (s3://, abfss://, hdfs://, ...) when running on a non-local master, because the index file lives there and any Spark task that later reads it needs cross-executor visibility. The wrapper fails fast at buildIfMissing time if this isn't set on a non-local master.
Filtering and deletes. search accepts a packed byte[] of deleted (file_id << 32) | row_index rids — useful for honoring Delta deletion vectors / Iceberg position deletes without rebuilding the index. The RowFilter plumbing is the same as the bulk-join path (Phase 1).
What this pattern is NOT. This is not for batched query workloads — see Pattern 3. The driver call is sequential; running 10K queries through it on the driver is 10K × ~3 ms = 30 sec single-threaded. For batches, use the bulk join, which distributes across executors.
Pattern 3: Spark-cluster caller, many independent queries
A batch of query vectors — e.g., 10K user queries to score against a 100M-row corpus, or "for each row in a small probe table, find its top-10 neighbors." This is already Path C with the role of L and R inverted: queries play the role of L (distributed), and the indexed corpus plays the role of R.
val queries: DataFrame = ... // small, distributed
val corpus: DataFrame = spark.read.parquet(corpusPath)
// Shipping today (Phase 1.5):
val topK = queries.kNearestJoin(corpus, "qvec", "vec", k = 10)
Nothing new is required for this — the join already covers it, and the bulk-join path uses the same ExternalIndexLifecycle cache as LanceParquetIndex.buildIfMissing, so an index built once is shared across both. The only sugar we'd add later is a method-on-index alias for callers who prefer that shape:
// Phase 2 sugar (NOT shipped — sketch only):
val idx = LanceParquetIndex.buildIfMissing(spark, filePaths, "vec")
val topK: DataFrame = idx.searchBatch(queries, qCol = "qvec", k = 10)
// Equivalent to: queries.kNearestJoin(corpus, "qvec", "vec", k) under Path C
Index lifetime is shared across all three patterns
The index built once — by any caller — can be opened by any of the three patterns. The manifest (manifest.json next to index.idx) is self-describing: caller doesn't need to know how it was built. This is the load-bearing reason the index lives in object storage as a self-contained directory, not inside a Lance dataset directory tree. A maintenance script can build the index nightly; a Spark job, a Java service, and a Trino UDF can all read it concurrently.
Surface area (in PR #5 today unless noted)
| Pattern |
Class |
Method |
Status |
Purpose |
| 1 |
org.lance.index.external.ExternalIvfPqIndex |
build / open / search / fetchRows |
shipped (Phase 1) |
JNI handle. No Spark dependency. Any JVM caller. |
| 2 |
LanceParquetIndex |
build / buildIfMissing / open |
shipped |
Scala factory. buildIfMissing shares the SHA-256-keyed cache with IndexedNearestJoinExternal. |
| 2 |
LanceParquetIndex |
search(query, k, ...): Seq[SearchResult] |
shipped |
Driver-side single-query top-k. |
| 2 |
LanceParquetIndex |
searchToDF(query, k, projection?): DataFrame |
shipped |
1-partition DataFrame for pipeline composition. With projection, materializes payload via Lance's parquet reader. |
| 2 |
LanceParquetIndex |
fetchRowsToDF(refs, projection): DataFrame |
shipped |
Random-access fetch in caller order. |
| 3 |
IndexedNearestJoinExternal |
apply(left, right, k, ...) |
shipped |
Bulk join. Pattern 3 today via df.kNearestJoin(...). |
| 3 |
LanceParquetIndex |
searchBatch(queries, qCol, k, ...): DataFrame |
proposed (Phase 2 sugar) |
Method-on-index alias for the bulk join. Not load-bearing — kNearestJoin already covers the use case. |
The driver-side surface (Pattern 2) is exercised by LanceParquetIndexTest: 5 tests covering JNI round-trip, DataFrame schema shape, projection materialization, fetchRowsToDF ordering, and cache reuse with IndexedNearestJoinExternal. Recall is exercised in external_index_phase1.rs on the Rust side; the Scala test focuses on what the wrapper itself can break.
Q5: Why is each hot-path piece in Lance, not Spark?
This section answers the question more precisely than Q3.3 by walking through each step of the probe-refine-materialize loop and showing what Spark would have to do to replicate it.
5.1 SIMD distance kernels
Lance's lance-linalg crate ships AVX2 / NEON / FP16 implementations of L2, dot, and cosine distance. These are the kernels every probe call inside Lance hits. Examples from the Rust source:
lance-linalg/src/distance/l2.rs::l2() — auto-detects target CPU features at runtime, dispatches to AVX2 unrolled loop on x86_64
- 16 floats per AVX2 ymm register × FMA = 32 flops per cycle on Skylake+
- Tested at 30-50 GFLOPS sustained on a single core at dim=128
The equivalent in Spark JVM:
- Best case:
udf((a: Seq[Float], b: Seq[Float]) => l2(a, b)) — Tungsten cannot vectorize this UDF. Each pair is ~500-1000 cycles in JIT'd code.
- With Java Vector API (Spark 3.5+ optionally): some auto-vectorization, but only for 128-bit lanes safely; higher-width incubator API isn't enabled by default in Spark builds.
- Spark Catalyst expressions for vector ops (Spark 4.2 added
VectorL2Distance, VectorCosineSimilarity, VectorInnerProduct) — these CAN go to Tungsten codegen, but the kernels are not SIMD-tuned and run in the JVM's bytecode interpreter or JIT'd scalar paths.
Honest comparison at dim=128: Lance's native kernel is 30-50 GFLOPS; Spark's UDF path is 0.5-2 GFLOPS depending on JIT. 20-50× slower per distance.
For one query × K × refine_factor = 80 distances at dim=128, that's:
- Lance: ~80 × 128 × 2 / 30 GFLOPS ≈ 0.7 microseconds
- Spark UDF: ~80 × 128 × 2 / 1 GFLOPS ≈ 20 microseconds
Per query the gap is 20μs — small. But at |L|=1000 queries × refine, it's 20 ms vs 0.7 ms of pure distance work, plus the UDF-call and row-population overhead per evaluation. And this is just one component of the wall.
5.2 Page-index-aware parquet random access
Refinement reads ~80 specific rows per query out of a 1M-row parquet file. The two ways to do this:
| Method |
What it costs |
Where it's available |
| Page index + RowSelection (the right way) |
~16 KB read for the page index + ~80 × ~4 KB page reads = ~330 KB total per query |
parquet-rs ≥58.3 with PageIndexPolicy::Required. Lance uses this. |
| Row group filter + full decompression |
Decompress 1-10 row groups (~50K-500K rows × 512B vec = 25-250 MB) to extract 80 rows |
parquet-mr (Spark's JVM library) today. No fix in flight upstream. |
Lance's path: ~330 KB I/O per query, decoded to ~80 vectors in microseconds.
Spark's parquet-mr path: ~50-250 MB I/O per query, decoded to one full row group worth of vectors, then 80 of them extracted. 150-700× more bytes read per query.
This isn't a Spark configuration issue. parquet-mr genuinely doesn't support page-level row selection. Building it is a months-long parquet-mr contribution + Spark integration. Lance just ships parquet-rs as a dep and gets the right behavior immediately.
5.3 PQ-encoded vector storage
Product Quantization (PQ) is an approximate vector encoding scheme:
- A
dim=128, num_sub_vectors=16 PQ codebook stores 256 representative codewords per 8-dim sub-vector
- Each vector is encoded as 16 bytes (one byte per sub-vector, 8 bits = 256 codewords)
- Distance to a query is approximated via lookup tables: precompute distance from query's sub-vector slice to each of 256 codewords, then sum 16 lookups per candidate
This compresses dim=128 vectors from 512 bytes to 16 bytes and makes distance scoring 32× faster (SIMD lookup vs full-vector L2).
Spark has no PQ. To replicate, we'd need:
- A PQ codebook training algorithm (k-means × 16 sub-vectors, ~100 LoC)
- An on-disk format for PQ codes
- A lookup-table distance scorer
- Catalyst plan nodes for the IVF probe + PQ scoring
All of which exists in Lance. Reimplementing in Spark is a 6-month project even if no one cares about SIMD performance — and at production scale you very much do care.
5.4 IVF posting list traversal
IVF stores K-means cluster centroids and a posting list per cluster. A query:
- Scores against all
num_partitions centroids (one big distance kernel call)
- Picks the closest
nprobes
- For each probed cluster, walks its posting list applying PQ scoring to candidates
This is a tight loop. Lance does it in a single JNI call (idx.search(query, k, ...) returns a Vec<SearchResult>).
In Spark, doing this would require:
- A Catalyst plan node that opens the index file
- A custom physical operator that runs the loop (no Catalyst expression maps to "walk a posting list")
- Or: do it in a UDF, paying JVM-overhead per row in the posting list (~10K-100K rows per probed cluster)
The posting list is fundamentally a tight inner loop, not a Catalyst expression. Spark's architecture is built for big batched columnar operations; per-row tight loops with branches (like "is this candidate worth refining?") are precisely what falls off Tungsten codegen and into slow JVM interpretation.
5.5 Refinement — needs all of the above
Refinement is where 5.1, 5.2, 5.3, 5.4 stack:
For each query in left:
1. IVF probe (5.4) → 80 candidate rids with PQ-approx scores
2. Decode rids to (file_path, row_index)
3. Per-file batch: parquet random access (5.2) → 80 actual vectors
4. Compute 80 exact L2 distances (5.1) on contiguous Vec<f32> buffer
5. Sort by distance, return top-K
If any one of these steps is in Spark (UDF, JVM, parquet-mr), the whole pipeline is bottlenecked by that step. There's no "do most of it in Lance and one piece in Spark" middle ground that performs well — the slowest step dominates.
This is why the Lance + Spark integration looks like "give Lance the input data, get the final results back" — not "do half the algorithm in one and half in the other."
5.6 Catalyst's reach: what we DID push into Spark planning
Spark contributes the parts it's actually good at:
- Plan-shape recognition. The Catalyst rule (
IndexedNearestByJoinRule) detects NearestByJoin(L, R, approx=true, ...), unwraps R through SubqueryAlias/View/passthrough Project, translates supported Filter predicates to a Lance-side prefilter SQL string, decides routing among Paths A/B/C.
- Distribution + AQE. Spark inserts the
Exchange between probe and merge stages (Path A's pipeline). AQE's CoalesceShufflePartitions, OptimizeSkewJoin, OptimizeShuffleWithLocalRead engage on this exchange.
- Final output assembly.
LanceMaterializeExec emits RDD[InternalRow] matching NearestByJoin.output, then a top-level Project strips the synthetic __score column. Catalyst handles the rest of the plan's lifecycle (caching, Catalyst optimizations on the SURROUNDING plan, df.explain()).
- Filter pushdown. Right-side
WHERE predicates that translate to Lance SQL get threaded through to Lance's index-side prefilter. Predicates that don't translate cause the rule to refuse the rewrite (refusal not partial pushdown) — fall through to Spark's brute-force.
Spark does what it's good at (high-level distributed plan management, optimization rules, AQE, lifecycle); Lance does what it's good at (SIMD distance, vector index, parquet random access). The boundary between them is a thin RDD-level interface. This is the right decomposition.
Q6: Who cares?
Concrete users / use cases this proposal unlocks:
- Spark KNN over parquet/Delta — the primary case. Vector embeddings stored in parquet/Delta tables (the standard for Spark warehouses) get sub-second similarity search without migrating to Lance. (Path C.)
- Spark on Lance datasets — the existing case (lance-format/lance-spark#541). For users who already have data in Lance, no change. (Path A.)
- Vector search on arbitrary subplans —
Filter(Join(parquet, delta), ...) — supports any DataFrame. (Path B.)
- Iterative ML workflows — train a model, score against existing embedding tables, write results, repeat. Each query benefits from the ~30s amortized index build instead of paying per-query temp-write costs.
- Real-time inference services running on Spark — the warm per-query latency at 600-900 ms (wide-medium) and ~5-9 sec (mega-medium, |L|=1000 batch) makes interactive vector search practical.
- Multi-modal retrieval — text + image embeddings stored alongside payload columns. Path C reads only projected payload at materialize time, so wide R doesn't bottleneck.
The bigger picture: vector data is becoming a first-class column type in data warehouses (Snowflake's VECTOR, BigQuery's EMBEDDING_DISTANCE, Postgres pgvector). Spark needs to be in this story or it gets routed around. This proposal puts Spark on competitive footing without forcing users to migrate to a vector-native format.
Q7: Benchmarks
7.1 Methodology
Cluster: Spark 3.5, 16 executors × 4 cores requested, shared file:// scratch volume. The bench runs an executor-pool CPU probe before timing — a fixed-cost compute loop on every task slot — and prints per-executor median compute. It aborts (or warns) if slowest_executor_median / fastest_executor_median > 1.25×, so reviewers can confirm the run was on a uniform pool before trusting the deltas. The numbers below were collected on a confirmed-uniform pool (1.01× spread on both scales).
Workload parameters:
dim=128, K=10, metric=L2
nprobes=16, refine_factor=8 (search-time)
- IVF: 256 partitions, PQ: 16 sub-vectors × 8 bits,
max_iters=50
- 16 string payload columns × 64 bytes each on R
Configs (all run on the SAME data in the SAME job per scale):
- A: Spark crossJoin + min_by_k baseline (skipped on wide payload — pair count dominates regardless)
- B-narrow: Path B (temp-Lance write) +
kNearestJoin, project rid only
- B-wide: Path B +
kNearestJoin, project rid + 16 payload columns
- C-indexed-narrow: Path A + IVF-PQ index (R already in Lance), project
rid only — apples-to-apples reference
- C-indexed-wide: Path A + IVF-PQ index, project
rid + 16 payload
- E: Path C (
IndexedNearestJoinExternal over the same parquet files), project rid only
- F: MLlib
BucketedRandomProjectionLSH baseline (skipped via BENCH_SKIP_LSH=true for cluster runs — see 3.2)
7.2 Results
Both scales were collected on a single cluster session (one job per scale) with 6 executors active and the pool-uniformity gate passing (≤1.01× spread). Numbers are from 2 measured runs per config (1 warmup discarded).
Wide-medium (|R|=1M, |L|=100)
| Config |
Run 1 |
Run 2 |
Median |
| B-narrow |
13,999 ms |
11,517 ms |
13,999 ms |
| C-indexed-narrow |
913 ms |
1,212 ms |
1,212 ms |
| E warm |
689 ms |
670 ms |
689 ms |
| E build (one-time) |
— |
— |
29,366 ms |
Speedups: E vs B-narrow = 20.3×. E vs C-indexed = 1.76× faster.
Mega-medium (|R|=10M, |L|=1000)
| Config |
Run 1 |
Run 2 |
Median |
| B-narrow |
98,334 ms |
107,227 ms |
107,227 ms (~1.8 min) |
| C-indexed-narrow |
8,358 ms |
8,229 ms |
8,358 ms |
| E warm |
8,602 ms |
8,454 ms |
8,602 ms |
| E build (one-time) |
— |
— |
103,806 ms |
| C-indexed pre-work |
— |
— |
80,055 ms (Lance write + index build, outside timing) |
Per-query (dividing by |L|=1000):
- B-narrow: ~107 ms/query
- C-indexed-narrow: ~8 ms/query
- E warm: ~9 ms/query
Speedups: E vs B-narrow = 12.5×. E vs C-indexed = 1.03× slower (~equivalent — both runs of each config land within ~2% of each other).
Stages and tasks (mega-medium, observed in run logs)
| Config |
Spark stages |
Tasks per stage |
| B-narrow |
~5-6 stages |
parquet read of R: 32+ tasks; Lance temp write: variable; probe: 1-2 tasks; shuffle exchange: 128 tasks; merge + materialize: 128 tasks |
| C-indexed-narrow |
~3-4 stages |
probe: 5-10 tasks (= number of Lance fragments); shuffle: 128 tasks; merge + materialize: 128 tasks |
| E warm |
1 stage (no shuffle) |
60 tasks (left's natural partitioning for |
E is the simplest pipeline. The fused stage (Path C) was a deliberate architectural choice after benchmarking showed the inherited Path A shuffle was vestigial here.
Per-run consistency (uniform-pool runs)
| Scale / config |
Run 1 vs Run 2 spread |
| wide-medium B-narrow |
18% |
| wide-medium C-indexed |
25% |
| wide-medium E warm |
2.7% |
| mega-medium B-narrow |
8% |
| mega-medium C-indexed |
1.5% |
| mega-medium E warm |
1.7% |
The pool-uniformity gate keeps run-to-run spread tight enough for sub-second deltas to be meaningful.
7.3 What the numbers mean — and what they don't
Reproducible findings (uniform-pool, single session per scale):
- B-narrow's per-query cost grows linearly with |R|. Temp-Lance write of (rid + rvec) is bandwidth-bound on shared scratch. At 1M rows it's ~14s; at 10M rows it's ~107s. Predictable, dominant.
- E warm's per-query cost is sub-linear in |R|. Per-query went from ~7 ms at wide-medium to ~9 ms at mega-medium despite |R| growing 10× — because the IVF probe + PQ refinement loop cost is bounded by
nprobes × partition_size + K × refine_factor, not by |R|.
- E vs B-narrow speedup is stable at 12-20× across both scales. The headline finding: external-index avoids the temp-Lance write, and that write is what dominates B's cost.
- E vs C-indexed depends on scale. At wide-medium E is ~1.76× faster (689 ms vs 1,212 ms — both runs of each config separated by ~2 sec). At mega-medium they're indistinguishable (1.03× slower, within run-to-run noise of each config). The "stay in parquet" path is not a perf penalty.
Why E might be slightly faster than C-indexed at smaller scale:
- Path A (C-indexed) has 3-4 Spark stages with ~50-200 ms scheduler overhead each (probe → shuffle → merge → materialize). Path C (E) has 1 fused stage. At wide-medium where the per-query work is small, that overhead is a meaningful fraction of total time.
- Path A's shuffle serializes
(leftId, ScoredRowRef) for K × probeParallelism × |L| refs through Spark's network stack. Path C keeps refs in-JVM.
- At mega-medium the per-query work is large enough that these constant-cost differences vanish into the noise.
What these numbers do NOT prove:
- Cold-cache parquet read performance. All numbers above are warm. Production S3/ABFS cold-cache is unmeasured and likely ~2-3× slower for E's first-after-cold-restart query.
- LSH numbers. Skipped from cluster runs; estimated 30-60s per query at |R|=1M based on
approxSimilarityJoin's O(|R|) shuffle cost.
- Long-tail latency. 2 measured runs is enough to distinguish 12× speedups from 1× speedups but not enough to characterize the p99 distribution. For latency-SLO-bound workloads, more runs are needed.
Methodology footnote. The pool-uniformity probe (ExecutorCpuCheck) is part of IndexedNearestJoinExternalBenchmark and prints a per-executor median table at the start of every run. Re-running the bench in your own cluster will produce a similar artifact you can use to decide whether the run's numbers are trustworthy.
7.4 Cloud-storage (abfss) follow-up — Phase 1.6 perf optimizations
The 7.1/7.2 numbers above are uniform-pool runs against file:// cluster scratch — no cloud-storage round-trip per refinement read. When Path C runs against abfss:// source parquet on Databricks (DBR 16.4 LTS, 2 workers × 4 cores), the per-query refinement I/O dominates: each query's 80 candidate vectors land on ~30-40 parquet pages within the row group, the page-index-aware reader coalesces them into one ~50 MB byte-range fetch, and the abfss round-trip cost shows up directly in wall-clock.
Tracking this on wide-tiny-l10 (|R|=100K, |L|=100, dim=128, 16 payload cols) on a 2-worker × 4-core DBR cluster:
| Optimization shipped (cumulative) |
E warm median |
Δ vs prior |
| abfss baseline (before any cloud-specific tuning) |
~12,000 ms |
— |
CoalescingParquetReader with 64 MiB gap + 32-parallel coalesce_ranges |
8,682 ms |
-28% |
ParquetMetaCache (per-task footer + page-index reuse across queries) |
7,471 ms |
-38% |
Tuned coalesce gap to 4 MiB + dispatched PQ scoring through compute_distances |
7,201 ms |
-40% |
search_batch — per-task unioned refinement read |
2,978 ms |
-75% |
local-fs reference for the same scale: ~1,200 ms. After the batch fix the abfss path is ~2.5× over local-fs (vs. ~10× before), which matches what we expected: the residual gap is now CPU-bound on probe+PQ, not I/O.
What changed in each step
-
CoalescingParquetReader — parquet-rs's default OBJECT_STORE_COALESCE_DEFAULT = 1 MiB and hardcoded 10-way parallelism are tuned for cluster-local NVMe. On abfss where each round-trip is 30-50 ms, we wrap ParquetObjectReader to merge ranges with a tunable gap and fetch with a tunable concurrency.
-
ParquetMetaCache — every per-query open_parquet_async was re-doing ObjectStore::from_uri + head + footer + page-index fetch (3 abfss round trips). Cache lives on OpenedExternalIndex and stores (ObjectStore, Path, file_size, ArrowReaderMetadata). On hit, ParquetRecordBatchStreamBuilder::new_with_metadata skips all three round trips. Same shape as Spark's ParquetIOMetadataCache, DuckDB's parquet_metadata_cache, and parquet-rs's own new_with_metadata example.
-
SIMD PQ via compute_distances — replaced the per-row scalar scoring loop in search.rs with ProductQuantizer::compute_distances, which dispatches to the SIMD distance kernels in lance-index. Required transposing PQ codes from row-major (on-disk) to column-major before scoring; one transpose per partition is far cheaper than N scalar scoring loops. Net win at 8-bit PQ was small (the SIMD path mostly fires for 4-bit) but the code is now dispatching to the right kernel for any future change in PQ width.
-
search_batch — the dominant fix. Adds pub async fn search_batch(opened, queries, k, nprobes, refine_factor, filter) -> Vec<Vec<SearchResult>> in lance::index::vector::external::search, with matching JNI (nativeSearchBatch), Java (ExternalIvfPqIndex.searchBatch), and Scala (ExternalIndexProbe.probeBatch) bindings. The fused Spark stage (ExternalFusedStage.fusedPartition) now collects the entire partition's left rows + queries first, then issues ONE batched probe call. Inside Lance, the batch path:
- Probes + PQ-scores each query independently
- Unions the candidate
(file_path, row_in_file) pairs across all queries
- Issues ONE refinement read per distinct file covering the unioned set
- Looks each query's candidates up from the shared per-file buffer for exact distance + top-K
Per-task cost goes from N × (probe + pq + refine_io) to N × (probe + pq) + refine_io. With a 12-13-query/task batch, the I/O term collapses by ~12×.
Per-batch breakdown (from RUST_LOG=lance::index::vector::external=info driver/executor logs)
extidx_search_batch n_queries=12 total=~2300ms
probe_pq ≈ 1800 ms (12 × ~150 ms — pure CPU, sequential within task)
union/topk ≈ 0 ms (in-memory hash-set + sort)
refine_open ≈ 40 ms (one ParquetMetaCache miss per task, then hits)
refine_io ≈ 470 ms (ONE ~50 MB byte-range fetch covering all candidates)
refine_files=1 always (one source parquet at this scale); candidate_pairs=960 (12 queries × 80 refine candidates). After dedup the per-file refinement read covers ~920 unique rows.
Why we stopped at -75%
Per-task probe+PQ is now 1.8 s of the 2.3 s batch wall-clock — CPU-bound, not I/O. Spark's task-per-core execution model already saturates all 8 cluster cores with 8 concurrent tasks, so intra-task parallelism (rayon across a batch's queries) would just steal from the other 7 tasks. Further gains require either more cluster cores, dropping to 4-bit PQ (better SIMD utilization), or a recall trade-off via lower nprobes/refine_factor. None of those are appropriate as default tunings; they're per-workload knobs.
Code changes shipped under Phase 1.6
rust/lance/src/index/vector/external/parquet_source.rs — CoalescingParquetReader, ParquetMetaCache, open_parquet_cached
rust/lance/src/index/vector/external/open.rs — OpenedExternalIndex owns the cache for its handle lifetime
rust/lance/src/index/vector/external/search.rs — search_batch + per-batch log::info! timing diagnostics; search reduced to a thin wrapper over search_batch(&[query])
rust/lance/src/index/vector/external/mod.rs — ExternalIvfPqIndex::search_batch public API
java/lance-jni/src/external_index.rs — nativeSearchBatch
java/src/main/java/org/lance/index/external/ExternalIvfPqIndex.java — searchBatch(float[][], k, nprobes, refineFactor, deletedRids)
lance-spark-knn_2.12/src/main/scala/org/lance/spark/knn/internal/ExternalIndexProbe.scala — probeBatch
lance-spark-knn_2.12/src/main/scala/org/lance/spark/knn/internal/ExternalFusedStage.scala — fusedPartition collects all queries before one batch probe call
- New test:
search_batch_matches_per_query_search asserts batched + per-query results agree exactly.
Q8: Risks
Architectural
| Risk |
Mitigation |
| Three paths are too many to maintain |
Each path has a clearly distinct preconditions; routing rule is mechanical. The shared probe + merge + materialize pipeline (Phase 1.5 staged execs) is reused across A and B. C is intentionally simpler (1 stage, no shuffle). Total LoC is comparable to a single path with all features, just split across files. |
| Catalyst rule complexity |
The rule is opt-in (spark.lance.knn.indexedNearestByJoin.enabled=false by default). Users who don't enable get Spark's default crossJoin rewrite — no behavior change. Once enabled, the rule's pattern match is conservative (refusal not partial pushdown). |
| Cross-language test surface |
Rust unit tests + JNI smoke tests + Scala integration tests cover the JNI surface. End-to-end tests in IndexedNearestJoinExternalTest exercise the full path. Cluster bench provides regression coverage. |
Implementation
| Risk |
Mitigation |
| parquet-rs corner cases (encrypted parquet, dictionary-encoded, INT96 timestamps) |
Document supported feature set; clear errors for unsupported. Encrypted parquet specifically is out of scope for Phase 1. |
Concurrent reads on the same parquet file require separate Arc<ParquetObjectReader> clones (parquet-rs &mut self on get_bytes) |
Phase 2 reader cache keyed on (file_id, session_id). Phase 1 constructs fresh per call — wasteful but simple. |
| Snapshot consistency drift between build and search |
Phase 2 manifest fingerprint (size + mtime + footer hash) validation at open(). Documented "rebuild on snapshot change" workflow. |
JVM parquet writers emit List<Float> not FixedSizeList<Float> |
parquet_source::coerce_to_fsl accepts both; validates row-length uniformity. Phase 1. |
| Cross-compile complexity for the JNI .so |
cargo zigbuild --target x86_64-unknown-linux-gnu --release works from macOS arm64. Documented for contributors. Upstream CI publishes multi-arch jars. |
Operational
| Risk |
Mitigation |
| Disk quota on cluster scratch |
Bench's cleanupSiblingScratchDirs strict pattern match, sweeps knn-bench-data-* siblings only. Production users size spark.lance.knn.tempR.dir and spark.lance.knn.externalIndex.dir per their cluster. |
| Multi-tenant cluster variance dominates fine-grained comparisons |
Bench reports run-to-run ranges. Median across multiple runs for headline. Honest about which deltas are within noise. |
| Index file format compatibility across lance-rs versions |
Index uses standard Lance IVF-PQ encoding (pb::Index protobuf). The manifest format is versioned (manifest_version: 1). Both are tested for round-trip across the open path. |
Strategic
| Risk |
Mitigation |
| Lance-c ecosystem maturity (Velox PR, Gluten support) overtakes the JNI path |
The Catalyst rule's logical contract is engine-independent; lowering to a Velox-native exec when that ecosystem matures requires only a Strategy entry, not a redesign. The JVM JNI path keeps working alongside. |
| Spark 4.x evolution breaks the rule injection |
The current rule targets Spark 4.2-SNAPSHOT for the NearestByJoin operator. For Spark 3.5, lance-spark uses a different Catalyst integration (the staged execs work in both). We test against multiple Spark versions in CI. |
Q9: Implementation status
Phase 1 (lance-rs): ✅ end-to-end shipped
rust/lance/src/index/vector/external/ — new module: mod.rs, types.rs, params.rs, parquet_source.rs, manifest.rs, build.rs, open.rs, search.rs, fetch.rs (~1,100 LoC)
rust/lance/src/index/vector/ivf.rs — new pub fn write_ivf_pq_file_external(...) parallel to the existing write_ivf_pq_file_from_existing_index but with no &Dataset arg
rust/lance/Cargo.toml — parquet promoted from dev-dep to runtime dep
- 14 unit tests + 1 integration test (
tests/external_index_phase1.rs); all passing
- Branch:
external-index-rfc-draft on sezruby/lance
Phase 1.5 (JNI + Java surface): ✅
java/lance-jni/src/external_index.rs — nativeBuild, nativeOpen, nativeClose, nativeSearch, nativeFetchRows + accessors
java/src/main/java/org/lance/index/external/ — ExternalIvfPqIndex, ExternalIvfPqIndexParams, SearchResult, ParquetRowKey
- 5 JNI smoke tests; all passing
Phase 1.5 (lance-spark integration): ✅
Bulk join (Pattern 3):
IndexedNearestJoinExternal.scala — public DataFrame API for df.kNearestJoin(parquetCorpus, ...)
internal/ExternalIndexProbe.scala — Scala wrapper around the JNI handle
internal/ExternalIndexLifecycle.scala — driver-side cache (SHA-256-keyed by file paths + col + params) + cleanup hook
internal/ExternalFusedStage.scala — single-stage probe + materialize, source-size-driven parallelism
internal/LanceVectorIndexBuilder.scala — promoted from test/ for bench use
IndexedNearestJoinExternalTest.scala — end-to-end test passing
benchmark/IndexedNearestJoinExternalBenchmark.scala — A vs B-narrow vs B-wide vs C-indexed-narrow vs C-indexed-wide vs E vs F (LSH)
Driver-side single-query API (Pattern 2):
LanceParquetIndex.scala — build / buildIfMissing / open / search / searchToDF / fetchRowsToDF. Hooks into ExternalIndexLifecycle so the index file is shared with the bulk-join path within one Spark application.
LanceParquetIndexTest.scala — 5 tests: JNI round-trip, DataFrame schema shape, projection materialization, fetchRowsToDF caller order, and buildIfMissing cache reuse.
Branch: knn-external-index on sezruby/lance-spark.
Phase 1.6 (cloud-storage perf): ✅ shipped
Optimization layer on top of Phase 1.5 — same external-index API, same recall, abfss/cloud-storage path now ~75% faster than uncloud-tuned baseline. See Q7.4 for the timeline and per-batch breakdown.
parquet_source.rs — CoalescingParquetReader (tunable coalesce gap + parallelism), ParquetMetaCache (per-handle footer reuse)
search.rs — search_batch for unioned per-task refinement; search reduced to a thin wrapper
- JNI + Java + Scala bindings for
searchBatch / probeBatch
ExternalFusedStage.fusedPartition issues one batched probe call per Spark task instead of N per-query calls
- Per-batch timing logs gated on
LANCE_LOG=lance::index::vector::external=info for production diagnostics
What's NOT in Phase 1
- Catalyst rule routing for parquet/Delta direct scans → Path C (Phase 3)
- Persistent index reuse across Spark sessions (Phase 2 — needs manifest fingerprint validation in lance-rs)
- HNSW / IVF-FLAT external builders (Phase 2/3)
append() / compact() for incremental index updates (Phase 4)
- Distributed index build via Spark executors (Phase 4)
- Reader cache keyed on
(file_id, session_id) (Phase 2)
Q10: Phasing
| Phase |
Goal |
LoC est |
Status |
| 1 (lance-rs) |
IVF-PQ external index over parquet, with refinement |
~1,100 + ~250 tests |
✅ done |
| 1.5 (JNI + Spark) |
JNI bindings + Java surface + DataFrame API + bench |
~600 (Rust+Java) + ~700 (Scala) + ~450 tests |
✅ done |
| 1.6 (cloud perf) |
search_batch + ParquetMetaCache + CoalescingParquetReader; abfss path 12s→3s on wide-tiny |
~250 + tests |
✅ done |
| 2 |
Manifest fingerprint validation + persistent-index reuse + reader cache |
~400 LoC + tests |
pending |
| 3 |
Catalyst rule routing for Path C (parquet/Delta auto-detect) |
~400 LoC + tests |
pending |
| 4 |
Distributed index build (Spark executors) + append() / compact() |
~1,200 LoC + tests |
pending |
Open questions
- Persistent index across Spark sessions. Phase 2 ships fingerprint validation; the lifecycle is per-application today. Should there be a Hive-metastore-style registration (
spark.lance.knn.externalIndex.catalog) so multiple apps share an index? Or is it sufficient to expose the URI and let users wire their own catalog?
- Should Path C support Delta deletion vectors natively? Today:
RowFilter byte-array of deleted rids — caller materializes the bitmap from Delta's snapshot. Native Delta integration would need a lance-spark-delta companion. Probably belongs as a follow-up rather than core.
- Path B's temp dir cleanup contention. Multiple jobs writing to the same scratch root can race. Per-app namespacing exists; per-query namespacing would add safety. Worth it?
probeParallelism > 1 for Path C. Path A supports this (fragment-grouped probe); Path C doesn't yet because Lance's idx.search() already merges across IVF partitions internally. But for very large |L| (millions), having multiple Spark tasks share the work for one left row would reduce per-row latency. Phase 3 candidate.
- HNSW external builder. Same shape as IVF-PQ but different on-disk encoding. Phase 3 — defer until use case is concrete.
- What's the right "stay in parquet vs migrate to Lance" decision rule for documentation? Current draft: "if R is queried >2 times and writes are infrequent, Path C wins on TCO; if R is queried once-and-discarded, Path B is fine; if you can migrate cleanly and keep R updated in Lance, Path A wins per-query latency."
References
This proposal's artifacts
Upstream
Background reading
- Spark
NearestByJoin operator (Spark 4.2): the logical operator this work attaches a strategy to
- Spark Catalyst
postHocResolutionRule injection point: where the rule lives in the analyzer/optimizer pipeline
- parquet-rs ≥58.3 page index APIs:
PageIndexPolicy, RowSelection
- Lance's IVF-PQ index format: shared between Path A (Lance dataset) and Path C (external) — same on-disk encoding
SPIP: Lance-backed approximate nearest-neighbor join for Apache Spark — three complementary paths
📊 Measured results summary (added 2026-06-16)
Single-query retrieval (M=1, driver-side native
Dataset.newScan(...nearest), IVF-PQ)wikipedia-2023-11-embed-v3Sub-200 ms p50 at 100M means distributing a single indexed query through Spark cannot win — Spark's task-scheduling floor (~100–300 ms warm) alone exceeds the single-machine time. Online retrieval should stay single-machine / native.
Batch KNN-join, indexed (synthetic 100M × dim=128, IVF-PQ, M=5000 queries, 8×4 cluster)
mapPartitions→LanceProbe.probe, no shuffle)kNearestJoinstaged pipeline,probeParallelism=1)At indexed scale the staged 3-stage pipeline never beat the dumb
mapPartitionsshape — its shuffle + merge + second-scan materialize is pure overhead because per-query Lance work is already tiny.Batch KNN-join, no index (Cohere wiki 10M × dim=1024, brute-force, M=100, 8×4 cluster)
kNearestJoin,probeParallelism=8)kNearestJoin,probeParallelism=32)mapPartitions)Even where distribution genuinely pays (single-machine is 30 s/query brute-force), the minimal-JVM
mapPartitionsshape still beats the staged pipeline by ~2.8×. The staged pipeline's per-query fan-out (replicate L × fragments, shuffle, merge, materialize) createsM × N_frag × Kintermediate JVM rows that don't otherwise need to exist.Native batch-query API (lance-core #6828, shared flat-KNN scan)
Local (1M × dim=512, flat KNN): batched
Scanner::nearestover M queries shares one scan/decode and gives ~1.5× at M=10, ~1.7–1.8× plateau at M≥100 vs M independent queries. The win is bounded by the point where per-query SIMD distance compute matches the saved I/O; it grows on slower storage where the shared scan is the expensive component.Takeaway for design: keep the hot path in Lance Rust. Spark should be a thin fragment-dispatch + top-K-merge layer (à la PR lance-format/lance-spark#608); every Arrow→
InternalRow→shuffle→InternalRow→Arrow round-trip in the JVM is measured overhead, not speedup.Outline
Q1: What are we trying to do?
We want
leftDf.kNearestJoin(rightDf, leftVec, rightVec, k, ...)in Spark to run fast and at production scale, regardless of whatrightDfactually is. "Fast" means single-digit seconds at million-row R, not minutes. "Regardless" means: a Lance dataset, a parquet table, a Delta table, a wide-payload table, or an arbitrary upstream Spark subplan — the user shouldn't have to learn three different APIs.Concretely, the proposal adds three execution paths backed by Lance, plus a routing layer that picks the right one:
All three paths share the same probe-and-refine algorithm internally. The differences are about where the data lives and what migration cost the user can tolerate.
Beyond the bulk join, the same Lance primitives also power single-query retrieval — the RAG / "find the 10 nearest docs to this query" shape. The Rust core and JNI surface already accept a single query and return top-k; the Scala wrapper
LanceParquetIndex(PR #5) adds a driver-sidesearchToDFso the result composes with the rest of a Spark pipeline without paying Spark's ~100-300 ms scheduler floor. Section 4.6 covers all three caller patterns: any-JVM service (noSparkSession), Spark-driver single-query, Spark-distributed batched queries.The proposal also explains, component by component, why every piece of the hot path must live in Lance Rust (with SIMD distance kernels and page-index-aware parquet random access), not in Spark JVM code — and why a pure-Spark approach hits a wall well before production scale.
Q2: What this is NOT designed to solve
INSERT/UPDATEpropagation into the index is a Phase 4 problem, not Phase 1.Q3: How is it done today, and what are the limits?
3.1 Spark's brute-force
crossJoin— the natural-but-wrong defaultThis is what Spark 4.2's
RewriteNearestByJoinlowers to. The work it does:|L| × |R|rows. At|L|=1000, |R|=10Mthis is 10 billion (vector, vector) pairs.K × |L|.For a real workload (
|L|=1000, |R|=10M, dim=128), this is ~30+ minutes of cluster time at minimum, dominated by the 10B distance evaluations and the giant shuffle.This is the baseline every Spark user hits today. It's correct, it's exact, and it's unusable past tens of millions of pairs.
3.2 Spark MLlib
BucketedRandomProjectionLSHThe "obvious" approximate alternative built into Spark MLlib. Sketch:
Reading LSH.scala reveals what
approxSimilarityJoinactually does:The cost analysis:
|R| × numHashTablesrows. At|R|=1M, numHashTables=5that's 5M rows inexplodedB.hashValue: a Spark shuffle join keyed on the LSH bucket. Number of (L, R) collision pairs scales with|L| × |R| / num_buckets, summed across hash tables.distinct()that costs another shuffle.At our
wide-mediumconfig (|L|=100, |R|=1M, dim=128, numHashTables=5) this is 30-60 seconds per query dominated by the shuffle. Atmega-medium(|R|=10M) it's minutes per query.The fundamental problem isn't tuning. LSH was designed for streaming similarity joins — where you receive a continuous stream of pairs and want approximate threshold filtering. It is the wrong algorithm for the "given a query vector, return top-K from a static R" workload, which is exactly the kNN-by-join shape. For that workload, IVF-PQ visits a fixed
nprobes × partition_sizecandidates regardless of|R|(sub-linear scaling), while LSH visits hash-bucket collisions that scale with|R|(linear at best, quadratic worst-case if buckets collapse).We left LSH in the bench harness behind
BENCH_SKIP_LSH=truefor honesty, but skipped it from cluster runs as not informative — the comparison would just confirm "LSH is much slower."3.3 The Spark-native KNN wall
A natural question is: "Can't we just write IVF-PQ probe + refine in Spark?" The honest answer is no — the wall is built from six independent costs, each of which alone is enough to disqualify the approach.
3.3.1 JVM row population overhead
Spark's row representation for
FixedSizeList[Float]isUnsafeArrayDatawrapping anUnsafeRow. Reading a single dim=128 vector means:getFloat(i)calls, each computing an offset and reading 4 bytes from UnsafeRowVectors.dense(arr.toArray.map(_.toDouble))conversion if calling MLlib (allocates a newdouble[]and aDenseVectorobject)For a probe that touches
K × refine_factor = 80candidate vectors per query, that's 80 × 128 = 10,240 boxed reads + 80 fresh DenseVector allocations PER QUERY. At|L|=1000queries × this work, it's ~10M boxed reads and ~80K allocations per query batch — and that's before doing any actual distance computation.Lance, by contrast, reads the entire candidate set as a flat
Float32Array(one allocation, contiguous memory) and feeds it directly to a SIMD distance kernel that processes 16 floats per AVX2 cycle. The throughput difference is roughly 50-100×.3.3.2 Shuffle bandwidth for partition-grouping
If we tried to implement IVF probe in Spark, the natural shape would be:
But R doesn't naturally partition by IVF centroid — you'd have to repartition R first, which means shuffling all of R's vector data once before any query runs. At wide-medium that's ~500 MB; at mega it's ~5 GB. This shuffle is per-query unless R is pre-shuffled into a persistent dataset — which is exactly what a Lance dataset already does internally for free.
In other words: doing IVF-PQ in Spark requires recreating Lance's on-disk partition layout via Spark shuffles. You'd duplicate Lance's storage layout work in shuffle code that doesn't survive across queries.
3.3.3 Distance kernel performance
A 128-dim L2 distance computation in optimal native code:
A 128-dim L2 distance in JVM Scala with
Math.fmaUDF:Plus UDF call overhead per invocation (~50-200ns per call), Scala closure deserialization, codec inflation. Spark's Tungsten can vectorize some expression-level ops but it does NOT vectorize user-supplied UDFs over
ArrayType.At 80 candidate distances per query × 1000 queries (wide-medium), Spark's 50-100× per-distance penalty turns ~5 ms of native L2 into ~250-500 ms of JVM L2. That alone disqualifies a Spark-native refine path.
3.3.4 No page-index-aware parquet random access in Spark
Refinement needs to read 80 specific (file, row_index) pairs out of a 1M-row parquet file with no full scan. parquet-rs ≥58.3 supports this via
PageIndexPolicy::Required+RowSelection::from_consecutive_ranges+ the parquet column-and-page index. The parquet reader:parquet-mr (Spark's JVM parquet library) does NOT have this primitive. It supports row-group-level pruning (skip whole row groups based on column min/max) but not page-level random access. A "fetch 80 rows by row index" operation on parquet-mr decompresses entire pages, which means reading and decompressing ~100K rows to extract 1.
We'd need to add page-index-aware random access to parquet-mr to do refinement in Spark. That's a substantial parquet-mr contribution — and even if it lands, it's still JVM-level decompression + JVM row population (see 3.3.1) on the hot path.
3.3.5 No vector index file format in Spark
Spark has no concept of "an index file alongside a parquet table." There's no Catalyst plan node for "open this index file"; there's no DataSource v2 hook for "the table has an associated index"; there's no catalog metadata for index existence. We'd need to invent all of this in lance-spark code, or persuade upstream Spark to take a fundamental schema additions PR.
What we have today (Path C in this proposal) sidesteps all of that: the index file is a Lance artifact opened directly from the JNI handle, completely outside Catalyst's plan tree.
3.3.6 GC pressure under multi-query batches
The natural Spark code shape is "for each left row, run a probe, collect refs, then materialize." Each probe call allocates:
double[]for the query vector(distance, rid)candidate tuplesArrayList[K]for top-KAt
|L|=1000× per-query allocations of ~1-10 KB, it's tens of MB of short-lived object churn per query batch. Multiplied across the cluster, this triggers minor GC every few hundred ms in the executor JVMs and major GC every few seconds. Lance's Rust path has zero JVM heap allocation on the hot path — distance scoring is in-place on contiguousVec<f32>buffers.Combined: any single one of these costs is ~10× per-query overhead vs the Lance-native approach. Stacked together they're 100× to 1000× — easily the difference between a one-second per-query latency and a one-minute one. This is the wall.
Q4: What is new in this approach?
4.1 The three-path architecture
We don't propose a single hot-path implementation. We propose three implementations of the same logical operation, with a Catalyst rule choosing the right one based on what
rightDfis:The user writes the same API in every case. Routing is automatic via a Catalyst
postHocResolutionRule. Three paths because R has three meaningfully different shapes in real Spark workloads, and a single hot-path for all of them necessarily compromises somewhere.4.2 Path A: Lance-native KNN (R is already in Lance)
Reuses lance-spark's existing
IndexedNearestJoin(upstream issue #541).Hot path inside Lance:
Dataset.scan().nearest(query, k, refine_factor).execute()nprobespartitions, scores PQ codes, keeps topK × refine_factorcandidates(rowid, distance)pairsSpark side (3 staged execs):
The shuffle exists because Phase 1.5 supports
probeParallelism > 1(fragment-grouped probe), where multiple Spark tasks contribute partial top-K for the same left row. The merge stage'sreduceByKeycombines them.When this path applies: R is already a Lance dataset. Optimal latency, no temp write, no extra storage.
4.3 Path B: per-query temp Lance materialization (R is an arbitrary subplan)
The general-purpose answer for "R is anything else." See #2 for the full motivation.
Hot path:
kNearestJoin(left, right)andrightdoesn't unwrap to a Lance scan or a direct parquet scanrightto(rid := monotonically_increasing_id(), rvec, ...payload), write to a temp Lance dataset underspark.lance.knn.tempR.dironApplicationEnd+ JVM shutdown hookWhen this path applies: any non-trivial R — parquet that has been filtered, joined, transformed, or computed; in-memory DataFrames; subqueries; SQL scenarios with views.
Cost: the temp Lance write is real. At
|R|=1M, dim=128, narrow projection, it's ~15-20 sec on a cluster (shared object store + Lance encoding). At|R|=10Mit's ~2.5 min. This cost dominates Path B, and is the load-bearing reason Path C exists for the parquet case.4.4 Path C: external index over parquet (R is a direct parquet/Delta scan)
The new path. Lance builds an IVF-PQ index directly over the source parquet files, no temp Lance dataset, no column copies. Lance owns the parquet reader.
Hot path:
Build (once per logical "table version", amortized across queries):
PageIndexPolicy::RequiredIvfTransformer+shuffle_datasetindex.idx(standard Lance IVF-PQ encoding) +manifest.json(parquet file list)(file_id_u32 << 32) | row_index_u32Search (per query):
K × refine_factorcandidates with PQ-approx scores(file_path, row_index)PageIndexPolicy::Required,RowSelection::from_consecutive_rangesfor the requested rows, read actual vectorsVec<SearchResult { file_path, row_index, distance }>Materialize (post-topK fetch):
(file_path, row_index)pairs + projection columnsSpark side (1 fused exec — no shuffle):
Why no shuffle? Lance's
idx.search()returns the already-merged global top-K for one query — the cross-partition merge happens inside Lance, not Spark. So the Path A shuffle (which exists to merge partial contributions from fragment-grouped probes) is vestigial here. The shuffle was inherited from the Path A pipeline; we removed it after benchmarking confirmed it was a passthrough.When this path applies: R is
spark.read.parquet(path)orspark.read.format("delta").load(path), possibly with a passthroughProjectand/or supportedFilteron top. Anything else (joins, computed columns, unsupported predicates) falls through to Path B.4.5 The routing decision tree
User-visible: a single
df.kNearestJoin(rightDf, ...)API. Three execution strategies under the hood.4.6 Beyond join: single-query and batched-query retrieval
The same primitives that power the bulk join also serve a second use case that came up in feedback: single-query retrieval — one (or a small batch of) query vector(s) producing top-k rows. This is the RAG / search-bar / "nearest 10 docs to this prompt" shape. The bulk join is the wrong tool for it: shipping a 1-row DataFrame to executors, running one probe, shipping one row back is pure scheduler overhead.
The Rust core and JNI already accept a single query:
ExternalIvfPqIndex::search(query, k, nprobes, refine_factor, filter) -> Vec<SearchResult>. PR #5 ships a thin Scala wrapper,LanceParquetIndex, that adapts the JNI surface toDataFrame-returning entry points so the result composes with downstream Spark transforms. Three caller patterns, in increasing order of "Spark-ness":Pattern 1: any JVM caller, no Spark dependency
Index files are self-contained in object storage (
manifest.json+index.idx). Any JVM process can open and search:No
SparkSession, noorg.apache.sparkdependency on the classpath. Build the index once via Spark or an offline job, then serve queries from a Trino UDF, Presto plugin, a Java request-handler, a Jupyter kernel — anywhere the JVM runs. Per-query latency is bounded by IVF probe + 80 candidate refinement reads (~1-5 ms warm, ~50 ms cold-mmap).This pattern is fully shipped today as part of Phase 1; no Spark wrapper is needed for it. We call it out in the SPIP because it's the load-bearing reason the index file format is engine-independent — Spark is the integration we're proposing here, but Lance's vector index over parquet is a primitive that other engines can reuse.
Pattern 2: Spark notebook / job, one query at a time
A user is in a Spark session (notebook, structured streaming sink, batch job) and wants top-k rows for a single query vector to feed into downstream Spark transforms (joins, projections, aggregations, UDFs). PR #5 ships
LanceParquetIndexfor this:Why driver-side, not a Spark stage. A single probe is ~1-5 ms warm (the JNI call + IVF probe + 80 candidate refinements + Arrow IPC encode). Wrapping that in a
parallelize(Seq(qvec)).mapPartitions { idx.search(...) }would cost:versus ~1-5 ms for the driver-local call. The Spark wrapping isn't just "no benefit" — it's strictly slower by 30-100×. So
LanceParquetIndex.searchandsearchToDFboth run on the driver. The returnedDataFramehas 1 partition (because k is bounded; nothing to parallelize), and from that point on it composes with the rest of the Spark plan — that's the only reason it's aDataFrameat all rather than aSeq.Returned schema:
searchToDFformsearchToDF(qvec, k)(no projection)(file_path STRING, row_index LONG, score FLOAT)— keys + exact distancesearchToDF(qvec, k, projection = Seq("doc_id", "title"))(doc_id ..., title ..., score FLOAT)— payload columns from parquet + scoreWhen
projectionis non-empty, the wrapper issuesidx.fetchRows(keys, projection)after search and joins the payload into the row. The schema for the projection columns is read from the parquet footer (cached on first call). One Arrow IPC round-trip; no Spark shuffle.Index lifecycle is shared with the bulk-join path.
buildIfMissingkeys the index file onsha256(sorted file paths + vector column + params). If the bulk-join path (IndexedNearestJoinExternal) already built an index over the same files in this Spark application,LanceParquetIndex.buildIfMissingreturns immediately with the existing URI. Cleanup is application-scoped (SparkListenerApplicationEnd+ JVM shutdown hook).Cluster requirement.
spark.lance.knn.externalIndex.dirmust point at a shared filesystem (s3://, abfss://, hdfs://, ...) when running on a non-local master, because the index file lives there and any Spark task that later reads it needs cross-executor visibility. The wrapper fails fast atbuildIfMissingtime if this isn't set on a non-local master.Filtering and deletes.
searchaccepts a packedbyte[]of deleted(file_id << 32) | row_indexrids — useful for honoring Delta deletion vectors / Iceberg position deletes without rebuilding the index. TheRowFilterplumbing is the same as the bulk-join path (Phase 1).What this pattern is NOT. This is not for batched query workloads — see Pattern 3. The driver call is sequential; running 10K queries through it on the driver is
10K × ~3 ms = 30 sec single-threaded. For batches, use the bulk join, which distributes across executors.Pattern 3: Spark-cluster caller, many independent queries
A batch of query vectors — e.g., 10K user queries to score against a 100M-row corpus, or "for each row in a small probe table, find its top-10 neighbors." This is already Path C with the role of L and R inverted: queries play the role of L (distributed), and the indexed corpus plays the role of R.
Nothing new is required for this — the join already covers it, and the bulk-join path uses the same
ExternalIndexLifecyclecache asLanceParquetIndex.buildIfMissing, so an index built once is shared across both. The only sugar we'd add later is a method-on-index alias for callers who prefer that shape:Index lifetime is shared across all three patterns
The index built once — by any caller — can be opened by any of the three patterns. The manifest (
manifest.jsonnext toindex.idx) is self-describing: caller doesn't need to know how it was built. This is the load-bearing reason the index lives in object storage as a self-contained directory, not inside a Lance dataset directory tree. A maintenance script can build the index nightly; a Spark job, a Java service, and a Trino UDF can all read it concurrently.Surface area (in PR #5 today unless noted)
org.lance.index.external.ExternalIvfPqIndexbuild/open/search/fetchRowsLanceParquetIndexbuild/buildIfMissing/openbuildIfMissingshares the SHA-256-keyed cache withIndexedNearestJoinExternal.LanceParquetIndexsearch(query, k, ...): Seq[SearchResult]LanceParquetIndexsearchToDF(query, k, projection?): DataFrameprojection, materializes payload via Lance's parquet reader.LanceParquetIndexfetchRowsToDF(refs, projection): DataFrameIndexedNearestJoinExternalapply(left, right, k, ...)df.kNearestJoin(...).LanceParquetIndexsearchBatch(queries, qCol, k, ...): DataFramekNearestJoinalready covers the use case.The driver-side surface (Pattern 2) is exercised by
LanceParquetIndexTest: 5 tests covering JNI round-trip, DataFrame schema shape, projection materialization,fetchRowsToDFordering, and cache reuse withIndexedNearestJoinExternal. Recall is exercised inexternal_index_phase1.rson the Rust side; the Scala test focuses on what the wrapper itself can break.Q5: Why is each hot-path piece in Lance, not Spark?
This section answers the question more precisely than Q3.3 by walking through each step of the probe-refine-materialize loop and showing what Spark would have to do to replicate it.
5.1 SIMD distance kernels
Lance's
lance-linalgcrate ships AVX2 / NEON / FP16 implementations of L2, dot, and cosine distance. These are the kernels every probe call inside Lance hits. Examples from the Rust source:lance-linalg/src/distance/l2.rs::l2()— auto-detects target CPU features at runtime, dispatches to AVX2 unrolled loop on x86_64The equivalent in Spark JVM:
udf((a: Seq[Float], b: Seq[Float]) => l2(a, b))— Tungsten cannot vectorize this UDF. Each pair is ~500-1000 cycles in JIT'd code.VectorL2Distance,VectorCosineSimilarity,VectorInnerProduct) — these CAN go to Tungsten codegen, but the kernels are not SIMD-tuned and run in the JVM's bytecode interpreter or JIT'd scalar paths.Honest comparison at dim=128: Lance's native kernel is 30-50 GFLOPS; Spark's UDF path is 0.5-2 GFLOPS depending on JIT. 20-50× slower per distance.
For one query ×
K × refine_factor = 80distances at dim=128, that's:Per query the gap is 20μs — small. But at
|L|=1000queries × refine, it's 20 ms vs 0.7 ms of pure distance work, plus the UDF-call and row-population overhead per evaluation. And this is just one component of the wall.5.2 Page-index-aware parquet random access
Refinement reads ~80 specific rows per query out of a 1M-row parquet file. The two ways to do this:
PageIndexPolicy::Required. Lance uses this.Lance's path: ~330 KB I/O per query, decoded to ~80 vectors in microseconds.
Spark's parquet-mr path: ~50-250 MB I/O per query, decoded to one full row group worth of vectors, then 80 of them extracted. 150-700× more bytes read per query.
This isn't a Spark configuration issue. parquet-mr genuinely doesn't support page-level row selection. Building it is a months-long parquet-mr contribution + Spark integration. Lance just ships parquet-rs as a dep and gets the right behavior immediately.
5.3 PQ-encoded vector storage
Product Quantization (PQ) is an approximate vector encoding scheme:
dim=128, num_sub_vectors=16PQ codebook stores 256 representative codewords per 8-dim sub-vectorThis compresses dim=128 vectors from 512 bytes to 16 bytes and makes distance scoring 32× faster (SIMD lookup vs full-vector L2).
Spark has no PQ. To replicate, we'd need:
All of which exists in Lance. Reimplementing in Spark is a 6-month project even if no one cares about SIMD performance — and at production scale you very much do care.
5.4 IVF posting list traversal
IVF stores K-means cluster centroids and a posting list per cluster. A query:
num_partitionscentroids (one big distance kernel call)nprobesThis is a tight loop. Lance does it in a single JNI call (
idx.search(query, k, ...)returns aVec<SearchResult>).In Spark, doing this would require:
The posting list is fundamentally a tight inner loop, not a Catalyst expression. Spark's architecture is built for big batched columnar operations; per-row tight loops with branches (like "is this candidate worth refining?") are precisely what falls off Tungsten codegen and into slow JVM interpretation.
5.5 Refinement — needs all of the above
Refinement is where 5.1, 5.2, 5.3, 5.4 stack:
If any one of these steps is in Spark (UDF, JVM, parquet-mr), the whole pipeline is bottlenecked by that step. There's no "do most of it in Lance and one piece in Spark" middle ground that performs well — the slowest step dominates.
This is why the Lance + Spark integration looks like "give Lance the input data, get the final results back" — not "do half the algorithm in one and half in the other."
5.6 Catalyst's reach: what we DID push into Spark planning
Spark contributes the parts it's actually good at:
IndexedNearestByJoinRule) detectsNearestByJoin(L, R, approx=true, ...), unwraps R throughSubqueryAlias/View/passthroughProject, translates supportedFilterpredicates to a Lance-side prefilter SQL string, decides routing among Paths A/B/C.Exchangebetween probe and merge stages (Path A's pipeline). AQE'sCoalesceShufflePartitions,OptimizeSkewJoin,OptimizeShuffleWithLocalReadengage on this exchange.LanceMaterializeExecemitsRDD[InternalRow]matchingNearestByJoin.output, then a top-levelProjectstrips the synthetic__scorecolumn. Catalyst handles the rest of the plan's lifecycle (caching, Catalyst optimizations on the SURROUNDING plan,df.explain()).WHEREpredicates that translate to Lance SQL get threaded through to Lance's index-side prefilter. Predicates that don't translate cause the rule to refuse the rewrite (refusal not partial pushdown) — fall through to Spark's brute-force.Spark does what it's good at (high-level distributed plan management, optimization rules, AQE, lifecycle); Lance does what it's good at (SIMD distance, vector index, parquet random access). The boundary between them is a thin RDD-level interface. This is the right decomposition.
Q6: Who cares?
Concrete users / use cases this proposal unlocks:
Filter(Join(parquet, delta), ...)— supports any DataFrame. (Path B.)The bigger picture: vector data is becoming a first-class column type in data warehouses (Snowflake's
VECTOR, BigQuery'sEMBEDDING_DISTANCE, Postgres pgvector). Spark needs to be in this story or it gets routed around. This proposal puts Spark on competitive footing without forcing users to migrate to a vector-native format.Q7: Benchmarks
7.1 Methodology
Cluster: Spark 3.5, 16 executors × 4 cores requested, shared file:// scratch volume. The bench runs an executor-pool CPU probe before timing — a fixed-cost compute loop on every task slot — and prints per-executor median compute. It aborts (or warns) if
slowest_executor_median / fastest_executor_median > 1.25×, so reviewers can confirm the run was on a uniform pool before trusting the deltas. The numbers below were collected on a confirmed-uniform pool (1.01× spread on both scales).Workload parameters:
dim=128,K=10,metric=L2nprobes=16,refine_factor=8(search-time)max_iters=50Configs (all run on the SAME data in the SAME job per scale):
kNearestJoin, projectridonlykNearestJoin, projectrid + 16 payload columnsridonly — apples-to-apples referencerid + 16 payloadIndexedNearestJoinExternalover the same parquet files), projectridonlyBucketedRandomProjectionLSHbaseline (skipped viaBENCH_SKIP_LSH=truefor cluster runs — see 3.2)7.2 Results
Both scales were collected on a single cluster session (one job per scale) with 6 executors active and the pool-uniformity gate passing (≤1.01× spread). Numbers are from 2 measured runs per config (1 warmup discarded).
Wide-medium (|R|=1M, |L|=100)
Speedups: E vs B-narrow = 20.3×. E vs C-indexed = 1.76× faster.
Mega-medium (|R|=10M, |L|=1000)
Per-query (dividing by |L|=1000):
Speedups: E vs B-narrow = 12.5×. E vs C-indexed = 1.03× slower (~equivalent — both runs of each config land within ~2% of each other).
Stages and tasks (mega-medium, observed in run logs)
E is the simplest pipeline. The fused stage (Path C) was a deliberate architectural choice after benchmarking showed the inherited Path A shuffle was vestigial here.
Per-run consistency (uniform-pool runs)
The pool-uniformity gate keeps run-to-run spread tight enough for sub-second deltas to be meaningful.
7.3 What the numbers mean — and what they don't
Reproducible findings (uniform-pool, single session per scale):
nprobes × partition_size + K × refine_factor, not by|R|.Why E might be slightly faster than C-indexed at smaller scale:
(leftId, ScoredRowRef)forK × probeParallelism × |L|refs through Spark's network stack. Path C keeps refs in-JVM.What these numbers do NOT prove:
approxSimilarityJoin's O(|R|) shuffle cost.Methodology footnote. The pool-uniformity probe (
ExecutorCpuCheck) is part ofIndexedNearestJoinExternalBenchmarkand prints a per-executor median table at the start of every run. Re-running the bench in your own cluster will produce a similar artifact you can use to decide whether the run's numbers are trustworthy.7.4 Cloud-storage (abfss) follow-up — Phase 1.6 perf optimizations
The 7.1/7.2 numbers above are uniform-pool runs against
file://cluster scratch — no cloud-storage round-trip per refinement read. When Path C runs againstabfss://source parquet on Databricks (DBR 16.4 LTS, 2 workers × 4 cores), the per-query refinement I/O dominates: each query's 80 candidate vectors land on ~30-40 parquet pages within the row group, the page-index-aware reader coalesces them into one ~50 MB byte-range fetch, and the abfss round-trip cost shows up directly in wall-clock.Tracking this on
wide-tiny-l10(|R|=100K, |L|=100, dim=128, 16 payload cols) on a 2-worker × 4-core DBR cluster:CoalescingParquetReaderwith 64 MiB gap + 32-parallelcoalesce_rangesParquetMetaCache(per-task footer + page-index reuse across queries)compute_distancessearch_batch— per-task unioned refinement readlocal-fs reference for the same scale: ~1,200 ms. After the batch fix the abfss path is ~2.5× over local-fs (vs. ~10× before), which matches what we expected: the residual gap is now CPU-bound on probe+PQ, not I/O.
What changed in each step
CoalescingParquetReader —
parquet-rs's defaultOBJECT_STORE_COALESCE_DEFAULT = 1 MiBand hardcoded 10-way parallelism are tuned for cluster-local NVMe. On abfss where each round-trip is 30-50 ms, we wrapParquetObjectReaderto merge ranges with a tunable gap and fetch with a tunable concurrency.ParquetMetaCache — every per-query
open_parquet_asyncwas re-doingObjectStore::from_uri+head+ footer + page-index fetch (3 abfss round trips). Cache lives onOpenedExternalIndexand stores(ObjectStore, Path, file_size, ArrowReaderMetadata). On hit,ParquetRecordBatchStreamBuilder::new_with_metadataskips all three round trips. Same shape as Spark'sParquetIOMetadataCache, DuckDB'sparquet_metadata_cache, and parquet-rs's ownnew_with_metadataexample.SIMD PQ via
compute_distances— replaced the per-row scalar scoring loop insearch.rswithProductQuantizer::compute_distances, which dispatches to the SIMD distance kernels inlance-index. Required transposing PQ codes from row-major (on-disk) to column-major before scoring; one transpose per partition is far cheaper than N scalar scoring loops. Net win at 8-bit PQ was small (the SIMD path mostly fires for 4-bit) but the code is now dispatching to the right kernel for any future change in PQ width.search_batch— the dominant fix. Addspub async fn search_batch(opened, queries, k, nprobes, refine_factor, filter) -> Vec<Vec<SearchResult>>inlance::index::vector::external::search, with matching JNI (nativeSearchBatch), Java (ExternalIvfPqIndex.searchBatch), and Scala (ExternalIndexProbe.probeBatch) bindings. The fused Spark stage (ExternalFusedStage.fusedPartition) now collects the entire partition's left rows + queries first, then issues ONE batched probe call. Inside Lance, the batch path:(file_path, row_in_file)pairs across all queriesPer-task cost goes from
N × (probe + pq + refine_io)toN × (probe + pq) + refine_io. With a 12-13-query/task batch, the I/O term collapses by ~12×.Per-batch breakdown (from
RUST_LOG=lance::index::vector::external=infodriver/executor logs)refine_files=1always (one source parquet at this scale);candidate_pairs=960(12 queries × 80 refine candidates). After dedup the per-file refinement read covers ~920 unique rows.Why we stopped at -75%
Per-task probe+PQ is now 1.8 s of the 2.3 s batch wall-clock — CPU-bound, not I/O. Spark's task-per-core execution model already saturates all 8 cluster cores with 8 concurrent tasks, so intra-task parallelism (rayon across a batch's queries) would just steal from the other 7 tasks. Further gains require either more cluster cores, dropping to 4-bit PQ (better SIMD utilization), or a recall trade-off via lower
nprobes/refine_factor. None of those are appropriate as default tunings; they're per-workload knobs.Code changes shipped under Phase 1.6
rust/lance/src/index/vector/external/parquet_source.rs—CoalescingParquetReader,ParquetMetaCache,open_parquet_cachedrust/lance/src/index/vector/external/open.rs—OpenedExternalIndexowns the cache for its handle lifetimerust/lance/src/index/vector/external/search.rs—search_batch+ per-batchlog::info!timing diagnostics;searchreduced to a thin wrapper oversearch_batch(&[query])rust/lance/src/index/vector/external/mod.rs—ExternalIvfPqIndex::search_batchpublic APIjava/lance-jni/src/external_index.rs—nativeSearchBatchjava/src/main/java/org/lance/index/external/ExternalIvfPqIndex.java—searchBatch(float[][], k, nprobes, refineFactor, deletedRids)lance-spark-knn_2.12/src/main/scala/org/lance/spark/knn/internal/ExternalIndexProbe.scala—probeBatchlance-spark-knn_2.12/src/main/scala/org/lance/spark/knn/internal/ExternalFusedStage.scala—fusedPartitioncollects all queries before one batch probe callsearch_batch_matches_per_query_searchasserts batched + per-query results agree exactly.Q8: Risks
Architectural
spark.lance.knn.indexedNearestByJoin.enabled=falseby default). Users who don't enable get Spark's default crossJoin rewrite — no behavior change. Once enabled, the rule's pattern match is conservative (refusal not partial pushdown).IndexedNearestJoinExternalTestexercise the full path. Cluster bench provides regression coverage.Implementation
Arc<ParquetObjectReader>clones (parquet-rs&mut selfonget_bytes)(file_id, session_id). Phase 1 constructs fresh per call — wasteful but simple.open(). Documented "rebuild on snapshot change" workflow.List<Float>notFixedSizeList<Float>parquet_source::coerce_to_fslaccepts both; validates row-length uniformity. Phase 1.cargo zigbuild --target x86_64-unknown-linux-gnu --releaseworks from macOS arm64. Documented for contributors. Upstream CI publishes multi-arch jars.Operational
cleanupSiblingScratchDirsstrict pattern match, sweepsknn-bench-data-*siblings only. Production users sizespark.lance.knn.tempR.dirandspark.lance.knn.externalIndex.dirper their cluster.pb::Indexprotobuf). The manifest format is versioned (manifest_version: 1). Both are tested for round-trip across the open path.Strategic
NearestByJoinoperator. For Spark 3.5, lance-spark uses a different Catalyst integration (the staged execs work in both). We test against multiple Spark versions in CI.Q9: Implementation status
Phase 1 (lance-rs): ✅ end-to-end shipped
rust/lance/src/index/vector/external/— new module:mod.rs,types.rs,params.rs,parquet_source.rs,manifest.rs,build.rs,open.rs,search.rs,fetch.rs(~1,100 LoC)rust/lance/src/index/vector/ivf.rs— newpub fn write_ivf_pq_file_external(...)parallel to the existingwrite_ivf_pq_file_from_existing_indexbut with no&Datasetargrust/lance/Cargo.toml— parquet promoted from dev-dep to runtime deptests/external_index_phase1.rs); all passingexternal-index-rfc-drafton sezruby/lancePhase 1.5 (JNI + Java surface): ✅
java/lance-jni/src/external_index.rs—nativeBuild,nativeOpen,nativeClose,nativeSearch,nativeFetchRows+ accessorsjava/src/main/java/org/lance/index/external/—ExternalIvfPqIndex,ExternalIvfPqIndexParams,SearchResult,ParquetRowKeyPhase 1.5 (lance-spark integration): ✅
Bulk join (Pattern 3):
IndexedNearestJoinExternal.scala— public DataFrame API fordf.kNearestJoin(parquetCorpus, ...)internal/ExternalIndexProbe.scala— Scala wrapper around the JNI handleinternal/ExternalIndexLifecycle.scala— driver-side cache (SHA-256-keyed by file paths + col + params) + cleanup hookinternal/ExternalFusedStage.scala— single-stage probe + materialize, source-size-driven parallelisminternal/LanceVectorIndexBuilder.scala— promoted from test/ for bench useIndexedNearestJoinExternalTest.scala— end-to-end test passingbenchmark/IndexedNearestJoinExternalBenchmark.scala— A vs B-narrow vs B-wide vs C-indexed-narrow vs C-indexed-wide vs E vs F (LSH)Driver-side single-query API (Pattern 2):
LanceParquetIndex.scala—build/buildIfMissing/open/search/searchToDF/fetchRowsToDF. Hooks intoExternalIndexLifecycleso the index file is shared with the bulk-join path within one Spark application.LanceParquetIndexTest.scala— 5 tests: JNI round-trip, DataFrame schema shape, projection materialization,fetchRowsToDFcaller order, andbuildIfMissingcache reuse.Branch:
knn-external-indexon sezruby/lance-spark.Phase 1.6 (cloud-storage perf): ✅ shipped
Optimization layer on top of Phase 1.5 — same external-index API, same recall, abfss/cloud-storage path now ~75% faster than uncloud-tuned baseline. See Q7.4 for the timeline and per-batch breakdown.
parquet_source.rs—CoalescingParquetReader(tunable coalesce gap + parallelism),ParquetMetaCache(per-handle footer reuse)search.rs—search_batchfor unioned per-task refinement;searchreduced to a thin wrappersearchBatch/probeBatchExternalFusedStage.fusedPartitionissues one batched probe call per Spark task instead of N per-query callsLANCE_LOG=lance::index::vector::external=infofor production diagnosticsWhat's NOT in Phase 1
append()/compact()for incremental index updates (Phase 4)(file_id, session_id)(Phase 2)Q10: Phasing
append()/compact()Open questions
spark.lance.knn.externalIndex.catalog) so multiple apps share an index? Or is it sufficient to expose the URI and let users wire their own catalog?RowFilterbyte-array of deleted rids — caller materializes the bitmap from Delta's snapshot. Native Delta integration would need alance-spark-deltacompanion. Probably belongs as a follow-up rather than core.probeParallelism > 1for Path C. Path A supports this (fragment-grouped probe); Path C doesn't yet because Lance'sidx.search()already merges across IVF partitions internally. But for very large|L|(millions), having multiple Spark tasks share the work for one left row would reduce per-row latency. Phase 3 candidate.References
This proposal's artifacts
Upstream
NearestByJoinwork that's the foundation of Path ABackground reading
NearestByJoinoperator (Spark 4.2): the logical operator this work attaches a strategy topostHocResolutionRuleinjection point: where the rule lives in the analyzer/optimizer pipelinePageIndexPolicy,RowSelection