feat(knn): per-query temp Lance write for non-Lance R (issue #2)#3
Draft
sezruby wants to merge 7 commits into
Draft
feat(knn): per-query temp Lance write for non-Lance R (issue #2)#3sezruby wants to merge 7 commits into
sezruby wants to merge 7 commits into
Conversation
Three-config benchmark validating the per-query temp Lance design from #2: same data, same job, three execution paths. A: Spark crossJoin + min_by_k on parquet R (brute-force baseline) B: per-query temp Lance write + kNearestJoin against the temp URI C: Lance-native R + kNearestJoin (already-Lance reference) (B - C) = pure temp-write overhead. (B vs A) = headline speedup vs the naive parquet-R approach. Tiny scale local (M5 Max, 5 repeats): A 28,231 ms B 323 ms (36 ms tw + 287 ms probe) C 267 ms B beats A 87x; (B - C) overhead = 56 ms. Cluster mode supported via BENCH_CLUSTER_MODE=true + BENCH_DATA_PATH; cluster numbers blocked on infra (OpenShift CSI / PVC reconciler) and will be added as a follow-up. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…on-Lance R Per #2 stage 1: helper for materializing an arbitrary DataFrame to a temp Lance dataset before the existing indexed NearestByJoin pipeline. R can be parquet, delta, in-memory, or the result of an arbitrary upstream Spark plan; the helper writes it once and returns a URI the existing LanceProbeStage / LanceMaterializeStage consume unchanged. - LanceTempR.materialize(right, vecCol, projection, scratchDir): String Synthesises a unique _rid via monotonically_increasing_id(), projects rid + vec + caller-requested payload cols, writes to a unique sub-path under scratchDir. - LanceTempR.resolveScratchDir(spark): String Reads spark.lance.knn.tempR.dir; in cluster mode (master != local*), requires it to be set so the temp lands on a path every executor can see (s3://..., hdfs://..., file:///shared-mount/...). Local mode falls back to spark.local.dir + /lance-temp-r. Validation: - Round-trip: row count + rid uniqueness + vector column equality - Projection: temp schema is exactly rid + vec + requested cols - Subplan-backed sources (Filter+Project chain over parquet): same shape - Empty source: empty Lance dataset, no error - Validation: missing vec, unknown projection, reserved rid name → fail fast - resolveScratchDir: conf-key honoured; local-mode fallback writes correctly 71/71 tests pass (60 existing + 11 new). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Per #2 stage 2: the df.kNearestJoin(rightDf, ...) extension now accepts any DataFrame on the right side, not only Lance scans. When the right side is not a Lance scan, the extension materializes it once via LanceTempR.materialize() and routes the existing probe pipeline against the temp URI. Same data path on the wire. Behavior change: Before: parquet / in-memory / subplan R → IllegalArgumentException After: same inputs → temp Lance materialization → indexed kNN works Lance scans still take the existing fast path (no temp write). extractLanceUri now returns Option[(String, Option[Long])] instead of throwing on miss; callers fall through to materializeNonLanceR which: - Calls LanceTempR.resolveScratchDir to find a writable scratch dir (spark.lance.knn.tempR.dir is required in cluster mode; local mode falls back to spark.local.dir) - Materializes via LanceTempR.materialize with the user-specified rightProjection (or all of R's non-vector columns if rightProjection is None) Tests: replaced the three "throws on non-Lance R" cases with three positive oracle-equivalence tests covering parquet R, subplan-backed R (parquet → Filter → Project), and in-memory + alias-wrapped R. Lance-scan happy path and Filter-on-Lance unchanged. 71/71 tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…tasets Per #2 stage 3: without lifecycle management, every kNearestJoin against a non-Lance R leaks a Lance dataset on whatever scratch storage spark.lance.knn.tempR.dir points at — local FS, S3, HDFS, ABFS — until the JVM dies. This commit adds: - LanceTempLifecycle.register(spark, tempUri) Tracks the URI for cleanup. Idempotent (dedupes via LinkedHashSet). Invoked automatically from LanceTempR.materialize at the end of every successful write. - SparkListenerApplicationEnd cleanup path Per-app SparkListener; on application end, deletes all registered URIs for that app. Routes through Hadoop FileSystem.get(uri, conf) so it handles local/s3/hdfs/abfs uniformly. Best-effort: errors are logged and swallowed so cleanup can't break the user's session teardown. - JVM shutdown-hook fallback Single hook installed once per JVM, runs every app's cleanup on Runtime.shutdown — covers crashes / hard kills. Why not onJobEnd: a single kNearestJoin invocation runs multiple Spark jobs (write + probe + merge + materialize). onJobEnd would race the still-running probe and break correctness. onApplicationEnd is the right scope. Tests (6 cases): explicit cleanup deletes from disk, multi-URI cleanup, idempotent registration, SparkListenerApplicationEnd-triggered cleanup, deleteUri on non-existent path is a no-op, deleteUri null/empty is a no-op. 77/77 tests pass overall (71 + 6 new). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…Lance Per #2 stage 4: extend IndexedNearestByJoinRule (Spark 4.2 SQL path) so a NearestByJoin whose right side isn't a Lance scan can also be rewritten to the indexed path. The rule materializes the right plan to a temp Lance dataset at rule-application time via LanceTempR.materialize, then proceeds with the same staged-plan rewrite as for Lance R. - New conf TempRForSqlEnabledConfKey = "spark.lance.knn.tempRForSqlRule.enabled". Off by default. Two reasons it's separate from the main enabled flag: 1. The rule evaluates the right plan synchronously at analysis time — users should consciously accept the cost 2. Cluster mode requires spark.lance.knn.tempR.dir to be set; surfacing the failure behind an explicit opt-in is friendlier than failing on every NearestByJoin - rewriteIfApplicable's for-comprehension changes: Recognize ranking BEFORE attempting right-side resolution so we don't pay a temp materialization for queries that fall through anyway (wrong direction, mixed-side rank expression, etc.). unwrapLanceScan(right).orElse { if (tempRForSqlEnabled) materializeNonLanceR(right, rightVecCol) else None } - materializeNonLanceR: Wraps the right plan as a DataFrame via LanceKnnDatasetBridge.asDataFrame, calls LanceTempR.materialize with right.output.map(_.name) as projection (carry every right-side attribute the parent plan can reference), and synthesises a LanceScanInfo whose `output` reuses right.output's AttributeReferences so the top-level Project(j.output, ...) stays resolved. Any failure → return None and fall through to brute-force. Tests (3 new, 17 total in IndexedNearestByJoinRuleTest): - testTempRForSqlRewritesNonLanceR: parquet R + both flags on → rewrites to Project(LanceMaterialize(...)) - testTempRForSqlRequiresMainEnabledFlag: both flags must be on; the temp-R flag alone doesn't fire the rule - testNonLanceRWithoutTempRConfFallsThrough: pins existing behavior — without the temp-R conf, parquet R falls through 77/77 tests pass in lance-spark-knn_2.12, 20/20 in lance-spark-knn-4.2_2.13. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Config D (`kNearestJoin(parquetDf, ...)`) exercises the full public-API code
path added in stages 2-4: the extension internally hits
LanceKnnImplicits.materializeNonLanceR -> LanceTempR.materialize -> existing
probe pipeline.
Local validation, M5 Max, tiny scale (3 reps + 1 warmup):
A: Spark crossJoin + min_by_k (parquet R) 28,000 ms 1.0×
B: temp Lance write + kNearestJoin (manual) 322 ms 86.9×
C: Lance-native R + kNearestJoin (reference) 261 ms 107.3×
D: kNearestJoin(parquetDf) — built-in temp 319 ms 87.8×
(D - B) = 3 ms — within run-to-run noise. The public API does the same work
as the manually-spelled-out B, no extra overhead.
The new explain(extended=true) dump (head-scale only) confirms:
- Probe and Materialize URI both point at the temp Lance dir
- Full LanceProbe -> Exchange -> LanceMerge -> LanceMaterialize chain in
the executed plan
- Wrapped by AdaptiveSparkPlan (AQE-visible merge shuffle)
- Left side is unmodified (only R goes through temp materialization)
Lifecycle: zero leakage observed. Earlier-test runs from before stage 3
left orphaned temp dirs in spark.local.dir/lance-temp-r/ (no lifecycle
existed yet to clean them); fresh runs of LanceTempRTest +
LanceTempLifecycleTest after stage 3 produce delta=0 in that directory.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two improvements per review feedback on #2: 1. Schema validation before triggering the temp Lance write LanceTempR.checkSupported(schema) returns Some(reason) when any column in the projected schema (rid + vec + payload) is not Lance-writable. The conservative allow-list covers numerics, boolean, string, binary, date, timestamp, struct (recursive), array (recursive). Rejects MapType, NullType, and unrecognised types with a clear "column X has type Y" message. Caller-specific behaviour: - DataFrame API (kNearestJoin): LanceTempR.materialize throws IllegalArgumentException, surfaces to the user. They asked for it explicitly so a clear failure is the right answer. - Catalyst rule (SQL APPROX NEAREST): the rule's materializeNonLanceR calls checkSupported BEFORE doing any work and returns None on miss, making the rule fall through to Spark's brute-force RewriteNearestByJoin — the user's query still runs, just slowly. Same "refusal not partial" pattern as the existing prefilter-pushdown. 2. Same-path regression test in LanceKnnImplicitsTest testProbeAndMaterializeShareSameTempUri walks the analyzed plan of a kNearestJoin against a non-Lance R, finds the LanceProbeLogicalPlan and LanceMaterializeLogicalPlan nodes, and asserts both stage configs reference the SAME temp Lance URI. Future regressions where helper / implicits / IndexedNearestJoin.apply diverge produce a fast structural failure instead of silent wrong results from a probe-vs-materialize URI mismatch. Tests added (8 new): - 4 in LanceTempRTest: checkSupported on common types accepts; rejects Map; rejects array-of-Map (recursive); rejects struct-with-Map (recursive); materialize() throws on unsupported projection - 2 in LanceKnnImplicitsTest: testProbeAndMaterializeShareSameTempUri, testKNearestJoinRejectsUnsupportedColumnType - 1 in IndexedNearestByJoinRuleTest: testTempRForSqlFallsThroughOnUnsupportedSchema 84/84 in lance-spark-knn_2.12 (was 77; 4 + 2 = 6 new — note 1 existing test was left intact, so net is +7 not +8). 21/21 in lance-spark-knn-4.2_2.13 (was 20). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This was referenced May 22, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Implements #2 — generalize the indexed
NearestByJoinpath to non-Lance right sides via per-query temp Lance materialization.Summary
Today the indexed pipeline only fires when R is a Lance scan. With this PR, any DataFrame on the right side works — parquet, delta, in-memory, the result of an arbitrary upstream Spark plan. The right side is materialized once via
df.write.format("lance").save()to a temp Lance dataset, then the existing probe/merge/materialize pipeline runs against the temp URI. Same data path on the wire; same Catalyst-visible plan shape.Design
See issue #2 for the materialize-options analysis (M1 carry-in-temp vs M2 hash-join vs M3 custom-shuffle) — only M1 handles the subplan-R case, which is the load-bearing constraint.
Stages (5 commits)
caa3ad5IndexedNearestJoinTempRBenchmark(3-config end-to-end bench)b904365LanceTempR.materialize+resolveScratchDirhelper91e25efkNearestJoinextension transparently handles non-Lance R5adb387LanceTempLifecyclequery-scoped cleanup6b6ea74IndexedNearestByJoinRuleextension for non-Lance RTest status
lance-spark-knn_2.12: 77/77 pass (60 existing + 17 new)lance-spark-knn-4.2_2.13: 20/20 pass (17 existing + 3 new)Configuration
spark.lance.knn.tempR.dir— scratch path for temp Lance datasets. Required in cluster mode (any path every executor can read+write —s3://,abfss://,file:///shared-mount/...,hdfs://...). Local mode falls back tospark.local.dir.spark.lance.knn.tempRForSqlRule.enabled— opt-in for the SQL rule's temp-R path (the DataFramekNearestJoinextension does it automatically; the SQL path is gated separately becauseCatalyst rule → eager writeis unusual).Local validation (M5 Max, tiny scale)
From issue #2:
(B − C) = 56 ms is the pure cost of the temp path versus already-Lance R. Cluster numbers are blocked on infra (OpenShift CSI / PVC reconciler errors); will be added once the cluster is unstuck.
Out of scope (issue #2 follow-ups)
_metadata.row_indexfor cross-execution rid stability)Ready for
Initial review of the design choices and the rule extension's
materializeNonLanceRshape. Once approved, ready to mark non-draft and squash for upstream submission per the existing 7-PR plan.