Skip to content

feat: external Lance vector index path for parquet/Delta R (Phase 1)#5

Draft
sezruby wants to merge 6 commits into
knn-temp-rfrom
knn-external-index
Draft

feat: external Lance vector index path for parquet/Delta R (Phase 1)#5
sezruby wants to merge 6 commits into
knn-temp-rfrom
knn-external-index

Conversation

@sezruby

@sezruby sezruby commented May 22, 2026

Copy link
Copy Markdown
Owner

Draft PR — additive on top of PR #3 (knn-temp-r branch).

Tracking issue with full RFC + cluster benchmark numbers + design discussion: #4.

Two-path design

PR #3 (per-query temp-Lance write) stays the general-purpose path — works on any DataFrame R including subqueries, projections, joins. This PR adds a specialized path for direct parquet/Delta scans:

Path Use case
Temp-Lance (#3) Any DataFrame R: subqueries, projections, joins, arbitrary transformations
External-index (this PR) Direct parquet/Delta scans where Lance can read the source files directly

What this PR adds

Public API

IndexedNearestJoinExternal.apply(left, rightFilePaths, leftVecCol, rightVecCol, k, ...) — DataFrame API entry point. SQL Catalyst integration deferred to Phase 3 (same staging the original kNearestJoin shipped in).

Internal stages

  • ExternalIndexProbe — Scala wrapper around the new ExternalIvfPqIndex JNI handle (lance-core 7.1.0-beta.1, companion PR). Decodes Arrow IPC stream from fetchRows into Spark row maps.
  • ExternalIndexLifecycle — driver-side cache (sha256 keyed by file paths + col + params). Cleanup registered with the existing LanceTempLifecycle machinery.
  • ExternalFusedStage — single-stage probe + materialize. Lance returns global top-K per query already (no cross-task merge), so the leftId shuffle from the Lance-native pipeline is vestigial here. Fused stage opens the index once per task, probes every left row, batched fetchRows for the partition, emits final join Rows.

Benchmark

IndexedNearestJoinExternalBenchmark compares 5 configs on the SAME data in the SAME job:

  • A: Spark crossJoin + min_by_k brute-force (skipped on wide payload — pair count dominates regardless)
  • B-narrow / B-wide: per-query temp-Lance write + kNearestJoin (PR feat(knn): per-query temp Lance write for non-Lance R (issue #2) #3 path)
  • C-indexed-narrow / C-indexed-wide: Lance-native R + IVF-PQ index + kNearestJoin — apples-to-apples reference (same algorithm, R already in Lance)
  • E: this PR's IndexedNearestJoinExternal over the same parquet files
  • F: Spark MLlib BucketedRandomProjectionLSH baseline (gated behind BENCH_SKIP_LSH=true; rough estimates indicate 30-60s per query on 1M rows due to approxSimilarityJoin's O(|R|) shuffle cost — see issue External Lance vector index over parquet — RFC + tracking issue #4 for the source-code analysis)

Headline numbers (cluster Spark 3.5)

K=10, |L|=100, dim=128, IVF=256, PQ=16:

Scale B-narrow C-indexed-narrow* E warm E build (one-time)
wide-tiny (|R|=100K, 16 cols) 5,696 ms (running) 1,875 ms 7,230 ms
wide-medium (|R|=1M, 16 cols) 18,128 ms (running) 570 ms 29,617 ms
wide-large (|R|=1M, 64 cols) 17,187 ms 521 ms 30,140 ms

* Earlier benchmark used un-indexed Lance-native R as "C" (brute-force scan). Re-running with indexed C this round; numbers will be folded into issue #4 once the cluster job finishes.

E warm is 30-33× faster than B-narrow at scale. Build amortizes after ~2 queries vs B-narrow.

Latent bug fix in LanceVectorIndexBuilder

Promoted from src/test/scala to src/main/scala/.../internal so the bench can use it. While there, fixed an arg-order bug — VectorIndexParams.ivfPq signature is (numPartitions, numBits, numSubVectors, ...) not (numPartitions, numSubVectors, numBits, ...). The existing test helper had them swapped; bug was masked because the test's defaults coincidentally set both to 8.

Drive-by fix in IndexedNearestJoinTempRBenchmark

Pre-set spark.lance.knn.tempR.dir from BENCH_DATA_PATH in cluster mode so config D doesn't trip the helper's cluster-mode fail-fast guard. Cluster runs of the temp-R bench would otherwise need a separate env var.

What this PR does NOT include

  • SQL Catalyst integration (Phase 3 follow-up)
  • Persistent index across Spark sessions (Phase 2 follow-up — needs manifest fingerprint validation in lance-core)
  • HNSW / IVF-FLAT external builds (RFC Phase 2/3 in lance-core)

Status

DRAFT — for review of the design + integration approach. Companion lance-core PR at sezruby/lance#1 must land first (this depends on the new ExternalIvfPqIndex API).

References

Adds IndexedNearestJoinExternal alongside the existing IndexedNearestJoin (Lance R)
and the per-query temp-Lance path (#3). When R is a direct parquet/Delta scan,
build a Lance external IVF-PQ index over the source files and probe + refine +
post-topK materialize from those files — no temp Lance dataset, no column copies.

Sibling RFC + benchmark numbers:
  #4

== Files ==

internal/ExternalIndexProbe.scala
  Scala wrapper around the new ExternalIvfPqIndex JNI handle (in lance-core
  7.1.0-beta.1). Decodes Arrow IPC stream from fetchRows into Spark-compatible
  row maps.

internal/ExternalIndexLifecycle.scala
  Driver-side cache (sha256 keyed by file paths + col + params); cleanup hook
  registered with the existing LanceTempLifecycle so cluster scratch is cleaned
  on application end / JVM shutdown — same machinery as the temp-Lance path.

internal/ExternalFusedStage.scala
  Single-stage probe + materialize. Lance returns global top-K per query already
  (no cross-task merge needed), so the leftId shuffle from the Lance-native
  pipeline is vestigial here. Fused stage opens the index once per task, probes
  every left row, batched fetchRows for the partition, emits final join Rows.

internal/LanceVectorIndexBuilder.scala
  Promoted from src/test/scala — needed by IndexedNearestJoinExternalBenchmark
  to build IVF-PQ on Lance-native R for the C-indexed apples-to-apples reference
  config. Same body as the existing test helper.

  Note: fixes a latent arg-order bug in the existing test helper —
  VectorIndexParams.ivfPq signature is (numPartitions, numBits, numSubVectors,
  ...), not (numPartitions, numSubVectors, numBits, ...). The bug was masked
  in the existing test because its defaults set both args to 8 (swap invisible).

IndexedNearestJoinExternal.scala
  Public DataFrame API entry point. SQL Catalyst integration deferred to Phase 3.

benchmark/IndexedNearestJoinExternalBenchmark.scala
  Compares B-narrow / B-wide (temp-Lance, project rid only / +N payload) vs
  C-indexed-narrow / C-indexed-wide (Lance-native + IVF-PQ index, same projections)
  vs E (external-index over parquet). Reports E warm vs C-indexed (the apples-
  to-apples comparison: same algorithm, R lives in parquet vs Lance).

  Headline cluster numbers at wide-medium (|R|=1M, dim=128, 16 string payload
  cols, K=10, |L|=100):
    B-narrow:   18,128 ms
    E warm:        570 ms  (E build amortizes after ~2 queries vs B-narrow)

benchmark/IndexedNearestJoinTempRBenchmark.scala
  Drive-by fix: pre-set spark.lance.knn.tempR.dir from BENCH_DATA_PATH in
  cluster mode so config D doesn't trip the helper's cluster-mode fail-fast guard.

== Lance bump ==

pom.xml: lance.version 6.0.0-beta.4 → 7.1.0-beta.1 — required for the new
ExternalIvfPqIndex API. Companion lance-core PR:
  sezruby/lance#1

== Tests ==

IndexedNearestJoinExternalTest passes locally:
  16 left vectors × 2 parquet files of 320 rows; top-K returned per left;
  scores monotone; oracle hit rate above K/2 threshold.
sezruby added 2 commits May 22, 2026 07:38
Three fixes to the external-index path based on cluster benchmark findings.

== 1. Parallelism heuristic ==

ExternalFusedStage previously ran with `left.rdd.partitions.size` tasks. For
typical KNN workloads |L| is small (hundreds to thousands of queries) and Spark
right-sizes left to 1-2 partitions accordingly, leaving fetchRows running
serially on a 32-core cluster.

IndexedNearestJoinExternal.apply now sizes parallelism from source row count:

  targetRowsPerTask = 100  // ~5-50ms per Lance search × 100 = 0.5-5 sec/task
  estimatedRows     = left.queryExecution.optimizedPlan.stats.rowCount
  sizedParallelism  = ceil(estimatedRows / targetRowsPerTask), capped at
                      defaultParallelism
  parallelism       = mergeParallelism.getOrElse(sizedParallelism)

Conditional repartition: only UP, never DOWN. Users with already-good
partitioning aren't slowed down.

Also adds diagnostic prints so benchmarks see what parallelism the path picked.

Cluster impact at wide-medium (|R|=1M, |L|=100, 32-core cluster):
  Before: 1-2 tasks → ~4,300 ms warm
  After:  32 tasks  → ~600 ms warm  (7× speedup)

== 2. Bench scratch cleanup ==

Cluster scratch volumes accumulate 5-10 GB per bench run and eventually trip
"Disk quota exceeded" mid-write. Bench now sweeps stale `knn-bench-data-*`
siblings from dataRoot.getParent at start. Strict pattern match (only matches
the cpd-submit-bench naming), skips this run's own dataRoot.

== 3. mega-medium opt-in ==

`mega-medium` scale (|R|=10M) is now documented as requiring substantial scratch
volume; default scale lists exclude it.

Cluster bench results at wide-medium (medians across 4 runs):
  B-narrow (temp-Lance):       16,506 ms
  C-indexed-narrow (Lance-R):     545 ms
  E warm (external-index):        609 ms
  E build (one-time):          29,529 ms

E warm comparable to C-indexed-narrow (both sub-second); E ~27× faster than
B-narrow. E build amortizes after ~2 queries vs B-narrow.
Two improvements driven by mega-medium cluster runs.

== 1. BENCH_CONFIGS env var ==

The bench previously ran A+B+C+E+F unconditionally. At mega-medium scale
(|R|=10M) writing the Lance-native R datasets for C alone takes ~5+ minutes,
so running the full suite is 30+ min. BENCH_CONFIGS=b-narrow,e (or any subset)
selects which configs run. Default remains all configs when unset.

Lance-native R writes + IVF-PQ index build for C are gated behind c-narrow /
c-wide activation so they're skipped entirely when not needed.

== 2. Cap parallelism at spark.sql.shuffle.partitions ==

Heuristic was capping at defaultParallelism. cpd-submit-bench.sh sets
spark.sql.shuffle.partitions=128 (above 32-core defaultParallelism), and the
cap should match Spark's convention for shuffle parallelism — capping at
shuffle.partitions gives finer task granularity for large |L|.

Cluster mega-medium results (medians across 2 runs):
  B-narrow:           ~166s  (3 min target hit; temp-Lance write of ~5GB)
  C-indexed-narrow:    ~13s
  E warm:              ~26s
  E build (one-time):  ~98s

E vs B-narrow: 6.4× speedup. C beats E by ~2× but requires ~82s of pre-work
(write Lance dataset + build index) outside the timing loop, plus ~5GB extra
storage.
Adds LanceParquetIndex, a SparkSession-aware wrapper around ExternalIvfPqIndex
that exposes single-query and small-batch retrieval shapes (RAG / point-lookup
KNN) alongside the existing IndexedNearestJoinExternal bulk join.

The Rust core and JNI surface already supported single-query search; this
commit exposes it from Scala without forcing callers through kNearestJoin
with a 1-row left DataFrame (which is bounded below by Spark task launch
latency).

Surface:
  - LanceParquetIndex.build / buildIfMissing / open
  - search(query, k, ...) — driver-side, returns Seq[SearchResult]
  - searchToDF(query, k, projection) — 1-partition DataFrame for pipeline use
  - fetchRowsToDF(refs, projection) — random-access fetch with payload
  - buildIfMissing reuses ExternalIndexLifecycle's content-hash cache so
    the index file is shared with the bulk-join path

LanceParquetIndexTest covers JNI round-trip, DataFrame schema shape,
projection materialization, fetchRows ordering, and cache reuse. Recall is
tested in the Rust-side external_index_phase1.rs; here we focus on what the
wrapper itself can break.
@sezruby

sezruby commented May 27, 2026

Copy link
Copy Markdown
Owner Author

Added the driver-side single-query API per feedback on SPIP #7 § 4.6.

New surface (6c5fef9):

  • LanceParquetIndex — Scala wrapper around ExternalIvfPqIndex, with build / buildIfMissing / open / search / searchToDF / fetchRowsToDF
  • LanceParquetIndexTest — 5 tests, all green (JNI round-trip, DataFrame schema shape, projection materialization, fetchRows ordering, cache reuse)

Three caller patterns now end-to-end:

  1. Non-Spark JVMorg.lance.index.external.ExternalIvfPqIndex (already shipped Phase 1, no Spark dep)
  2. Spark, single query — driver-side LanceParquetIndex.searchToDF(qvec, k, projection) returning a DataFrame for downstream Spark composition (this commit)
  3. Spark, many queries — existing IndexedNearestJoinExternal / df.kNearestJoin(corpus, ...)

buildIfMissing reuses the same ExternalIndexLifecycle content-hash cache as the bulk-join path, so the on-disk index file is shared across all three patterns within one Spark application.

Updated SPIP §4.6 to reflect shipped status: #7 (comment)

sezruby and others added 2 commits May 28, 2026 14:45
Adds ExecutorCpuCheck.run() which executes a fixed-cost CPU loop on every
task slot, groups results by executorId, and prints a per-executor median /
min / max table sorted slow-to-fast. The probe runs once before the first
scale and answers the question "are these executors actually free?" — which
defaultParallelism alone cannot.

Output highlights the slowest/fastest ratio. ≤1.25× is uniform; ≥1.5×
prints a warning that bench medians are under contention. With
BENCH_CPU_CHECK_FAIL_RATIO set the probe throws if the ratio exceeds the
threshold, refusing to run a noisy bench.

Skip with BENCH_CPU_CHECK_SKIP=true when the bench is being used for
something other than measuring (smoke tests, debugging).

Existing benchmark output gains a new section before any timing happens, so
post-hoc readers can see whether the numbers were collected on a quiet
cluster or a contended one.
Makes IndexedNearestJoinExternalBenchmark runnable on cloud-storage data
roots (abfss://, s3://, ...) and as a Databricks spark_jar_task without
killing the host SparkSession. Validated end-to-end on Databricks DBR 16.4
LTS against an abfss-backed wide-tiny scale.

What changed and why:

- Cloud-aware listParquetFiles: dispatch on URI scheme. file:// and bare
  paths use java.nio (cheaper, no Hadoop init); other schemes route
  through Hadoop FileSystem so the bench can scratch onto cloud storage
  without changing the bench's contract that listParquetFiles returns
  Strings the external-index API can consume.

- Cloud-scheme guard in cleanupSiblingScratchDirs. The sibling sweep is a
  local-fs convenience (java.nio.Files.list); on cloud URIs it would
  either no-op silently or, worse, throw on Paths.get of a URI scheme it
  doesn't know. Skip with a clear message and let storage lifecycle /
  blob TTL handle cloud cleanup.

- deletePathIfExists helper before Lance writes. Spark's Lance datasource
  doesn't auto-overwrite even when mode("overwrite") is set — re-running
  against the same data root trips
  TableAlreadyExistsException on the first Lance write and
  NoSuchTableException if mode("overwrite") tries to drop-then-create on
  a fresh path. Pre-deleting via Hadoop FileSystem.delete is the
  scheme-agnostic fix that works on file://, abfss://, s3://, etc.

- parquet write parallelism scales with |R|. Old behavior: coalesce(1).
  At wide-tiny that's fine; at huge-medium (50M rows × dim=128 + payload)
  it serializes 70+ GB through one executor and runs >40 minutes. New
  behavior: max(1, numR/1M) parts so each parquet file is ~1M rows,
  letting executors write in parallel.

- huge-medium scale (|R|=25M, |L|=10000) added. Stresses the C-indexed-
  narrow vs E comparison at a regime where Path A's distributed merge
  has more fragments to spread probes across; previously bench topped
  out at mega-medium (|R|=10M).

- c-distributed-narrow config wires probeParallelism to a non-trivial
  value so Path A actually exercises its multi-stage merge path. Without
  this, C-indexed runs with probeParallelism=1 and the "cross-fragment
  merge" advantage we wanted to measure is invisible.

- sparkAlreadyRunning detection around spark.stop(). When running as a
  Databricks JAR task, the SparkSession is shared with the host REPL.
  Calling spark.stop() in finally tears down the host session, the
  bench's own summary print falls into a buffer that's torn down with
  it, and the run shows FAILED with no output even though all
  measurements completed. Fix: only stop the session we created.

- Bundle Netty in the fat jar. With spark.executor.userClassPathFirst,
  relying on Spark's shipped Netty triggers a cross-classloader
  IllegalAccessError on UnsafeDirectLittleEndian (Spark loads its Netty
  on AppClassLoader; the user-jar child loader sees a different one).
  Bundling produces a self-contained classpath that doesn't depend on
  the cluster honoring userClassPathFirst.

Test footprint: existing local Spark tests still pass; Databricks DBR
16.4 LTS smoke run (run_id 975303254574346) completed end-to-end on
wide-tiny against abfss with C-indexed-narrow and E configs. C-indexed
built the index in 26.6s and the warm kNJ ran in 1.9s; E built in 12s
and warm-ran in 7.7s.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant