Skip to content

Per-query temp Lance write — generalize indexed NearestByJoin to non-Lance right side #2

@sezruby

Description

@sezruby

Per-query temp Lance write — generalize indexed NearestByJoin to non-Lance right side

Summary

Today, lance-spark-knn's indexed NearestByJoin path (#541) only fires when the right side of the join is a Lance scan. This issue extends it to any DataFrame R — including the result of an arbitrary upstream Spark plan (parquet, delta, joins, filters, computed columns, anything) — by materializing R into a per-query temp Lance dataset before the probe.

Catalyst rule sees: NearestByJoin(L, R, ...)

If R is already a Lance scan:
    existing path                                              (no change)

Else:
    inject  Project(rid := monotonically_increasing_id(),
                    ...{cols projected from R by parent plan}, rvec)
    inject  LanceTempWriteExec(R-with-rid)                    (NEW)
       → temp Lance dataset on shared scratch storage
    Probe → Merge → Materialize against temp Lance            (existing)
    SparkListener cleanup on job end                          (NEW)

The probe / merge / materialize pipeline is unchanged. The only new piece is a LanceTempWriteExec plan node that drives R's execution to a temp Lance URI, plus minor rule machinery to inject it.

Headline implication: Lance becomes a query-time vector kernel for any DataFrame, not just Lance-native datasets. Users don't have to migrate their data to Lance to get the indexed NearestByJoin path.

TL;DR

Aspect Today After this issue
Right side must be Lance scan any DataFrame
Per-query setup cost none (R already on disk) one Lance temp write of R's projection
Maintenance none none — temp scoped to query lifecycle
Materialize cost unchanged unchanged (point-fetch on temp Lance)
Shuffle only the existing _leftId ref-shuffle same
New code ~600 LoC additive in lance-spark-knn

The cost of the temp write is real — seconds on shared object storage at typical scales — but the architecture stays simple because there's no shuffle added, no second-side scan, no maintained sidecar, no schema evolution to track.

Why this shape

Several alternative architectures were considered and ruled out for specific reasons. Documenting the reasoning here so reviewers can push back where they disagree.

What's hard about non-Lance R

When R is a Lance scan, three things are true that make the indexed path clean: rids are stable (_rowid), random access is cheap (Lance's whole point), and the data is already in Lance's columnar layout. None of those hold when R is a parquet/delta scan, or worse, the result of an arbitrary DAG (Filter(Join(parquet, delta), ...) etc.).

So the question is: where does the right-side data come from for the probe and materialize stages?

Materialize options considered

A few options were on the table. With "R may be an arbitrary subplan" as the primary constraint, two of the three drop out:

Option Materialize step Issue
M1: Carry projection in temp Lance Lance _rowid IN (...) point-fetch on temp R None — works on any R, including subplans
M2: Skinny temp + post-probe hash join with R BroadcastHashJoin / SortMergeJoin of survivors against R Requires re-running R as the join's right side. For subplan R, this re-executes the entire upstream DAG. Even for stable parquet R, it pays a full scan per query and shuffles for SMJ.
M3: Custom inverted-index shuffle RDD-level rid-keyed shuffle Reimplements what Spark's hash join already does. AQE-invisible. Spark's join is the inverted index.

M1 is the only option that handles subplan R correctly. The temp write happens once per query, the materialize is zero-shuffle, and the materialize point-fetches happen inside Lance where they're fast — exactly the pattern the existing pipeline is built around.

The earlier sidecar discussion in NEARESTBYJOIN_ANN_PROPOSAL.md section "Three materialize strategies" stays valid as a description of the design space, but it framed the sidecar over an entire primary table as the unit. The per-query temp shape is finer-grained: one temp Lance dataset per query, scoped to that query's lifecycle, no maintenance.

What temp Lance write actually costs

The LanceWriteBenchmark was added to measure this directly. Spark InternalRow → df.write.format("lance").save() throughput on M5 Max, local NVMe, JDK 17, Spark 3.5.5, lance-core 6.0.0-beta.4:

scale         rows    dim   med ms   vec MB/s   krows/s   Δ noop
─────────────────────────────────────────────────────────────────
tiny       100,000    128       40      1,221     2,500       -3
small    1,000,000    128      127      3,845     7,874     -108
medium     100,000  1,024      122      3,202       820      +34
fat      1,000,000  1,024    1,181      3,308       847     +498

Δ noop = lance write median minus df.write.format("noop") baseline, i.e. the pure sink cost once row generation is accounted for. For the fat case (1M × dim=1024 = 4 GB raw vector bytes), the write itself is ~500 ms. Local NVMe is the lower bound, not the upper bound.

On shared object storage (S3, ABFS, GCS), single-stream throughput drops to ~70-130 MB/s; aggregate parallel writes reach ~600 MB/s – 1 GB/s with 8 streams. So the fat case becomes ~5-15 s on shared storage. That's the real per-query overhead.

Comparison points to put this in context:

  • Brute-force RewriteNearestByJoin on the same data is tens of seconds to minutes for non-trivial scales (existing benchmarks in BENCHMARK_RESULTS.md).
  • Building a vector index after temp write is tens of seconds (cluster numbers in same doc).
  • The probe itself is ~0.5 s at the same scale.

So a 5-15 s temp write is small relative to either alternative (the brute-force baseline this replaces, or the index build that the temp write enables). It's a real cost but not a deal-breaker.

End-to-end local validation

A new benchmark (IndexedNearestJoinTempRBenchmark) exercises the proposed path end-to-end on the same data in the same job, so all configs are directly comparable with no cross-run noise. Four configs:

  • A: Spark crossJoin + min_by_k on parquet R — what users write today on parquet R, matches what Spark 4.2's RewriteNearestByJoin lowers to
  • B: per-query temp Lance write + kNearestJoin against the temp URI (manually spelled out) — the design under test
  • C: Lance-native R + kNearestJoin — same probe pipeline as B but R was pre-written to Lance once outside the timing loop
  • D: kNearestJoin(parquetDf, ...) — exercises the public API code path that internally calls the new temp-materialization helper. (B − D) should be near zero — sanity check that the public API has no extra overhead vs. the manual equivalent.

Local — Apple M5 Max, tiny scale (5 repeats, |R|=100K, |L|=100, dim=128, K=10)

config                                              median ms
────────────────────────────────────────────────────────────────
A: Spark crossJoin + min_by_k (parquet R)              28,231
B: temp Lance write + kNearestJoin (manual)               323     (36 ms tw + 287 ms probe)
C: Lance-native R + kNearestJoin (reference)              267
D: kNearestJoin(parquetDf) — built-in temp                319

Reads:

  • B beats A by 87×. The new code path replaces the brute-force baseline cleanly.
  • (B − C) = 56 ms is the pure cost of the per-query temp path vs. already-Lance R: 36 ms of df.write.format("lance").save() + ~20 ms of dataset open/manifest read on the read-back. That's the user-visible price of "we don't require Lance-native storage."
  • (D − B) = -3 ms (within noise). The public df.kNearestJoin(parquetDf) API has no overhead vs. the hand-written equivalent — proves the wiring is correct.
  • Oracle equivalence held — every config's top-K matches the in-memory brute-force ground truth on a 16-row left subset.

Cluster — distributed Spark 3.5 on Kubernetes, tiny scale (single run, multi-tenant)

8 × 4 cores × 16 GB executor pods, scratch on shared file:// mount, |R|=100K |L|=100 dim=128 K=10:

config                                              median ms
────────────────────────────────────────────────────────────────
A: Spark crossJoin + min_by_k (parquet R)             154,963
B: temp Lance write + kNearestJoin (manual)             3,205     (183 ms tw + 3,022 ms probe)
C: Lance-native R + kNearestJoin (reference)            1,568

Reads:

  • B beats A by 48× on a real distributed cluster.
  • (B − C) = 1,637 ms vs. local 56 ms. Cluster gap is bigger because (a) shared-storage write throughput is ~5× slower than local NVMe (183 ms vs. 36 ms for the same payload) and (b) the read-back path opens the dataset over the network and reads the manifest from shared storage (~1 sec on first probe). Still small relative to the brute-force baseline.
  • The cluster A baseline is 5.5× slower than local A (155 s vs. 28 s) because the cluster CPUs are slower and the cross-product shuffle has more overhead. Both ratios B/A and C/A still hold.
  • Multi-tenant ±20% noise envelope applies (per BENCHMARK_RESULTS.md § "Variance / multi-tenant noise"); the order-of-magnitude story is robust, the specific multiplier is not. This particular run was clean — within-config variance was ±1.4% (A), ±9% (B), ±3.5% (C), no noisy-neighbor signature.
  • Config D missing on this cluster table — bench code didn't pre-set spark.lance.knn.tempR.dir for the public-API path on the first cluster submission (the helper's cluster-mode fail-fast caught it). Bench is fixed; numbers from a re-run with all four configs and the wider scale sweep will be added when the second submission completes.

Caveats

  • Single run on cluster. Multiple-iteration medians needed for headline numbers; the table above is a point estimate.
  • Cluster (B − C) overhead grows with R size because both temp-write and dataset-open costs are bandwidth-bound. For multi-GB R the gap is dominated by the temp write (predictable from LanceWriteBenchmark's ~3 GB/s effective write throughput on local NVMe → much lower on shared storage).
  • The existing IndexedNearestJoinBenchmark cluster numbers stand — they're for the probe pipeline only and don't depend on whether R came from temp or Lance-native. This issue's claim is narrower: that the temp-write step doesn't tank the headline.

Why a native parquet→Lance copier isn't the load-bearing piece

Earlier discussion considered a native (Rust) fast path that reads parquet files directly and writes Lance, bypassing JVM row materialization. The benchmark above answers whether this is worth it: at 3 GB/s effective write throughput, even the fat case is half a second of pure sink cost on local NVMe and that's already close to bandwidth-bound on the storage side. A native copier would speed up the JVM-mode case by maybe 1.5-2×, which is a 10-30% improvement on total query time at typical scales.

Worth doing eventually as an optimization — and a useful general-purpose Lance feature ("fast Parquet ingest") that benefits more than just KNN — but not a prerequisite for the per-query temp story. JVM-mode write is fast enough.

Also, it doesn't help when R is a subplan, because then there are no parquet files to copy from in the first place.

Design

The Catalyst rule extension

Today the rule pattern-matches NearestByJoin on a Lance-scan-on-right. This issue widens that:

case nbj @ NearestByJoin(approx = true, ranking, left, right) =>
  right match {
    case lance: LanceDSv2Scan =>
      // existing path: probe directly against the Lance dataset
      buildExistingPlan(left, lance, ...)

    case other if shouldTempWrite(other) =>
      // NEW: materialize `other` to temp Lance, then probe against the temp
      val projection = extractProjectedCols(parentPlan, other)
      val withRid = Project(
        Alias(MonotonicallyIncreasingID(), "_rid")() +: projection,
        other)
      val tempUri = LanceTempWritePlaceholder(withRid)
      buildExistingPlan(left, tempUri, ridCol = "_rid", ...)

    case _ =>
      // fall through to Spark's brute-force RewriteNearestByJoin
      nbj
  }

shouldTempWrite is a cost gate: roughly, temp_write_estimated_time + probe_time < spark_brute_force_estimate. Defaults to "yes for non-trivial R sizes, no for tiny R where brute-force is already fast." Initial threshold based on stats from Statistics.sizeInBytes.

LanceTempWriteExec

A UnaryExecNode that:

  1. Reads its child's RDD[InternalRow] output
  2. Calls df.write.format("lance").save(tempUri) underneath, where tempUri = ${spark.lance.temp.dir}/${appId}/${queryId}/${side}
  3. Emits a single sentinel row containing the temp URI for downstream probe execs to consume

This is intentionally a thin wrapper over the existing lance-spark write path. No new write code; no JNI changes; no Lance-core changes.

The sentinel emission pattern is how the URI threads through Catalyst — the probe exec's inputRDDs() reads the URI from its child's output and opens the temp Lance dataset.

Rid synthesis

monotonically_increasing_id() produces 64-bit ids that are:

  • Unique across the partitions of one execution
  • Stable within a single execution (same call site, same partitions, same row order)
  • Not stable across separate executions

For per-query temp this is exactly right:

  • One LanceTempWriteExec execution = one rid space
  • Materialize-time point-fetches use the same temp Lance, so rids match
  • Cleanup deletes the temp; rids aren't needed cross-query

If R already has a user-supplied stable id column, the rule could detect that and reuse it instead of synthesizing — small future optimization, not required for v1.

Why not _metadata.row_index?

Spark 3.4+ exposes _metadata.row_index as a generated metadata column for parquet scans (SPARK-37980), giving each row its physical row index within its parquet file. Combined with _metadata.file_path it forms a stable (file, row_index) identifier that survives across executions.

That's the natural rid for the sidecar follow-up (out-of-scope item 1 below) where the goal is cross-query stability. It's not the right choice for per-query temp because:

  • Subplan R may not be backed by parquet files at all (Filter(Join(parquet, delta), ...) etc.) — there's no row_index to read
  • Per-query temp doesn't need cross-execution stability — the temp is built and consumed in the same execution, then deleted
  • monotonically_increasing_id works uniformly regardless of R's source

So this issue uses monotonically_increasing_id. The sidecar issue (filed separately) uses _metadata.row_index for parquet-backed R. Different rid mechanisms for different lifetimes.

Schema extraction (which columns to carry)

The rule walks the parent plan to determine which columns from R the output actually needs. Same machinery the existing rule uses for prefilter pushdown, extended:

  • Rid column: always carried
  • Vector column: always carried
  • All other R columns referenced by parent plan: carried
  • Columns NOT referenced by parent: dropped at write time (column pruning via Project injection above the LanceTempWriteExec)

For a wide R where the user only projects a few cols, the temp stays narrow. For SELECT *-style queries against wide R, the temp carries everything.

The "Lance write is fast" benchmark applies to the actually-carried columns; if a user projects 100 cols of which 5 are needed, only those 5 + rid + vec hit the temp.

Lifecycle

  • Scratch root: spark.lance.temp.dir, defaulting to a subdirectory of spark.local.dir for local mode and to a configured shared scratch path for cluster mode.
  • Per-query namespace: ${root}/${appId}/${queryId}/{left,right}/. Multiple concurrent KNN queries don't collide.
  • Cleanup: a SparkListener.onJobEnd hook deletes the per-query directory after the materialize stage completes. Same lifecycle pattern Spark uses for shuffle scratch.
  • Janitor for crashes: a periodic sweep of ${root}/${appId}/ deletes leftovers from crashed jobs based on directory mtime. Configurable interval; defaults to "off" (manual cleanup acceptable in most clusters; can opt in if shared scratch needs aggressive cleanup).
  • Partial-failure recovery: if LanceTempWriteExec fails mid-write, Spark task retry handles it (same as any other write). If the whole job aborts after temp write but before materialize, onJobEnd still fires and cleans up. If the JVM dies hard, the janitor handles it.

Plan shape

Project(user-output)
  └─ LanceMaterializeExec                ← unchanged
       └─ LanceMergeExec                 ← unchanged
            └─ Exchange(_leftId)         ← unchanged (existing AQE-visible shuffle)
                 └─ LanceProbeExec       ← unchanged (probes against temp URI)
                      └─ LanceTempWriteExec(left)                ← NEW
                      └─ LanceTempWriteExec(right with rid)      ← NEW

df.explain() shows the temp-write nodes in the plan, with their child sub-DAGs visible. AQE engages on the merge shuffle as today. The existing 3-exec staged design and the references = child.outputSet invariant (see IMPL_PLAN.md § "3-exec staged split — root cause and fix") apply unchanged.

Implementation plan

Roughly ~600 LoC across the lance-spark-knn modules. Sized to land as one PR (or two — the rule extension can be split from the exec node if helpful for review).

Component File Approx LoC
LanceTempWriteExec + logical plan node internal/staged/LanceTempWritePlan.scala ~300
Rule extension: detect non-Lance R, inject temp write catalyst/IndexedNearestByJoinRule.scala ~150
Schema/projection extraction extension same ~100
Lifecycle: SparkListener + cleanup internal/TempLifecycleManager.scala ~100
Cost gate: temp-write-vs-brute-force decision catalyst/IndexedNearestByJoinRule.scala ~50
Tests (correctness oracle, lifecycle, cost gate, subplan-R cases) */test/... ~400
Docs update in DESIGN.md and NEARESTBYJOIN_ANN_PROPOSAL.md docs ~150

Cost gate v1 is intentionally crude: a single spark.lance.knn.tempWrite.minRightRows threshold (default 10K) below which the rule refuses to rewrite and falls through to Spark's brute-force. Smarter gating (size-based, with stats fallback) is a Phase 2 polish.

Test coverage

Plan to ship the following test cases (mirroring the structure of existing KNN tests):

Test What it pins
LanceTempWriteExecTest LanceTempWriteExec writes a DataFrame to a temp Lance URI; rid column is unique per row; cleanup happens on job end.
IndexedNearestByJoinTempRTest End-to-end against a parquet R; oracle equivalence with brute-force; rule fires (plan shape includes LanceTempWriteExec).
IndexedNearestByJoinSubplanRTest R is a non-trivial subplan (filter + project + join); same oracle equivalence.
IndexedNearestByJoinTempRWidth*Test R has wide projection / narrow projection; verify schema extraction trims unused cols at temp write.
IndexedNearestByJoinTempLifecycleTest Concurrent queries don't collide on scratch dir; cleanup happens on each query independently; failed query doesn't leak temp dir indefinitely.
IndexedNearestByJoinCostGateTest Tiny R bypasses the rule (falls through to Spark brute-force); large R triggers temp-write path.

Builds on the existing IndexedNearestJoinCorrectnessTest brute-force oracle pattern.

Out of scope (follow-up issues)

The conversation that led to this issue covered several adjacent extensions worth noting but explicitly out of scope:

1. Cached / 1:1 file-pair Lance sidecar over parquet/delta R

A separate, more durable pattern for stable R (parquet or delta tables on storage, not subplans). One Lance sidecar file per parquet data file, with rid being (parquet_file_path, row_index) provided natively by Spark 3.4+ via the _metadata.row_index generated metadata column (SPARK-37980).

The natural rid for sidecar mode:

df.select(
  col("_metadata.file_path"),
  col("_metadata.row_index"),   // stable across executions for one file
  col("vec"),
  ...
)

This sidesteps the rid-stability problem that monotonically_increasing_id has — row_index is the physical position within the parquet file, which is fixed for as long as the file exists. Survives parquet compaction by per-file rebuild (the file path changes, so the sidecar entry is invalidated and rebuilt). Supports lazy column accretion via Lance's add_columns. Integrates cleanly with delta deletion vectors (DVs apply on top, sidecar represents physical rows).

Right design when:

  • R is a stable named table on storage (parquet or delta)
  • The same R gets queried repeatedly
  • Build cost amortizes over many queries

Strictly more complex than per-query temp (catalog hooks, staleness fingerprints, build triggers), and doesn't help when R is a subplan. Filed as a follow-up so the per-query path can ship first.

For pre-3.4 Spark or sources without row-index metadata, sidecar mode falls back to monotonically_increasing_id and accepts the rebuild-on-recompute cost.

2. Random-access reader for parquet

Read individual rows out of parquet files via row-group + page index + bloom filter primitives. An alternative to building a sidecar at all — materialize directly from parquet with point-fetches.

Reasonable in theory; the catch is that production parquet usually isn't written with the metadata that makes this fast (page indexes, bloom filters). Without those, every point-fetch decompresses whole row groups. Worth revisiting once parquet writers ship those features by default and existing data lakes have them.

Could also play a role as a "cold column" complement to the sidecar pattern — sidecar holds frequently-projected columns, rare projections fall through to random-access parquet. Promising mature-system shape; not load-bearing now.

3. Native Rust parquet→Lance copier

Discussed earlier in the conversation. The benchmark (LanceWriteBenchmark) showed that JVM-side df.write.format("lance") is already bandwidth-bound — a native copier would offer 10-30% on total query time at typical scales. Useful as a Lance-core feature ("fast Parquet ingest") beyond just KNN, but not required to ship the per-query temp story.

Belongs in lance-format/lance (RFC + Rust implementation) more than in this connector.

4. Velox / Gluten path

Velox PR #16556 adds a Lance read connector (read-only, vector ops deferred). lance-c Phase 2 adds vector ops to the C API, Phase 3 adds writes. End-to-end native KNN through Gluten + Velox + Lance is the long-term shape but is multi-year. The current per-query temp path runs entirely in Spark JVM and is the right thing to ship now.

When that ecosystem matures, the same Catalyst rule shape registers a different physical strategy (native instead of JVM); the logical plan contract is unchanged.

Asks

  1. Sanity check on the design before I send PRs — anything in the "alternative materialize options ruled out" reasoning that you'd push back on?
  2. Preferred home for LanceTempWriteExec: lance-spark-knn_2.12 alongside the staged execs (current plan), or somewhere more general in lance-spark-base so non-KNN code could potentially reuse it?
  3. Ship as one PR or split rule-extension and exec-node? My instinct is one PR since they're tightly coupled, but happy to split for review.
  4. Naming: LanceTempWriteExec vs LanceMaterializeForKnnExec vs something else. The "this is a query-scoped write" property is the load-bearing one.

References

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions