Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,48 @@ public class LanceSparkReadOptions implements Serializable {
public static final String CONFIG_TOP_N_PUSH_DOWN = "topN_push_down";

public static final String CONFIG_NEAREST = "nearest";

/**
* Whether executors should rebuild the namespace client and re-fetch storage options via {@code
* namespace.describeTable()} when opening a dataset for fragment scans.
*
* <p>When {@code true} (the default), executors reconstruct the namespace client and route the
* dataset open through the namespace path. This keeps the Rust-side storage-options provider
* attached so that short-lived vended credentials returned by {@code describeTable()} (e.g. STS
* tokens from Iceberg REST, Polaris, Unity) can be refreshed mid-scan.
*
* <p>When {@code false}, executors open the dataset directly by URI using the storage options the
* driver already obtained (passed in via {@code initialStorageOptions}). This skips the eager
* {@code describeTable()} RPC on every fragment scan, which is required for catalogs whose
* backing service authenticates per-call (e.g. Hive Metastore over Kerberos): executors typically
* do not have a Kerberos TGT and the call would otherwise fail with {@code GSS initiate failed}.
*
* <p>Whether disabling this option actually costs anything depends on the namespace impl:
*
* <ul>
* <li>{@code Hive2Namespace} / {@code Hive3Namespace}: {@code describeTable()} returns only the
* table location, never storage options. The refresh callback is a no-op, so setting this
* option to {@code false} has no downside. The underlying object-store credentials (e.g.
* IAM-role / {@code hive-site.xml} / env-vars on the executor) are rotated by the storage
* client SDK independently of Lance.
* <li>{@code GlueNamespace}: storage options come from a static {@code
* config.getStorageOptions()} and are typically not time-bound; setting {@code false} is
* usually safe unless you rely on LakeFormation-vended temporary credentials.
* <li>{@code IcebergNamespace} (REST), {@code PolarisNamespace}, {@code UnityNamespace}: {@code
* describeTable()} commonly returns vended temporary credentials. Leave this option at the
* default ({@code true}) unless every scan is guaranteed to finish within the credential
* TTL.
* </ul>
*/
public static final String CONFIG_EXECUTOR_CREDENTIAL_REFRESH = "executor_credential_refresh";

public static final String LANCE_FILE_SUFFIX = ".lance";

private static final boolean DEFAULT_PUSH_DOWN_FILTERS = true;
// Changed from 512 to 8192 for better OLAP scan performance (33x improvement)
private static final int DEFAULT_BATCH_SIZE = 8192;
private static final boolean DEFAULT_TOP_N_PUSH_DOWN = true;
private static final boolean DEFAULT_EXECUTOR_CREDENTIAL_REFRESH = true;

private final String datasetUri;
private final String dbPath;
Expand All @@ -88,6 +124,12 @@ public class LanceSparkReadOptions implements Serializable {
/** The catalog name for cache isolation when multiple catalogs are configured. */
private final String catalogName;

/**
* Whether executors should rebuild the namespace client for credential refresh. See {@link
* #CONFIG_EXECUTOR_CREDENTIAL_REFRESH} for details.
*/
private final boolean executorCredentialRefresh;

private LanceSparkReadOptions(Builder builder) {
this.datasetUri = builder.datasetUri;
String[] paths = extractDbPathAndDatasetName(datasetUri);
Expand All @@ -105,6 +147,7 @@ private LanceSparkReadOptions(Builder builder) {
this.namespace = builder.namespace;
this.tableId = builder.tableId;
this.catalogName = builder.catalogName;
this.executorCredentialRefresh = builder.executorCredentialRefresh;
}

/** Creates a new builder for LanceSparkReadOptions. */
Expand Down Expand Up @@ -239,6 +282,15 @@ public String getCatalogName() {
return catalogName;
}

/**
* Returns whether executors should rebuild the namespace client and route the dataset open
* through the namespace path (for credential refresh). See {@link
* #CONFIG_EXECUTOR_CREDENTIAL_REFRESH}.
*/
public boolean isExecutorCredentialRefresh() {
return executorCredentialRefresh;
}

public boolean hasNamespace() {
return namespace != null && tableId != null;
}
Expand Down Expand Up @@ -275,6 +327,7 @@ public LanceSparkReadOptions withVersion(int newVersion) {
.namespace(this.namespace)
.tableId(this.tableId)
.catalogName(this.catalogName)
.executorCredentialRefresh(this.executorCredentialRefresh)
.build();
}

Expand Down Expand Up @@ -324,6 +377,7 @@ public boolean equals(Object o) {
return pushDownFilters == that.pushDownFilters
&& batchSize == that.batchSize
&& topNPushDown == that.topNPushDown
&& executorCredentialRefresh == that.executorCredentialRefresh
&& Objects.equals(nearest, that.nearest)
&& Objects.equals(datasetUri, that.datasetUri)
&& Objects.equals(blockSize, that.blockSize)
Expand All @@ -347,7 +401,8 @@ public int hashCode() {
nearest,
topNPushDown,
storageOptions,
tableId);
tableId,
executorCredentialRefresh);
}

/** Builder for creating LanceSparkReadOptions instances. */
Expand All @@ -365,6 +420,7 @@ public static class Builder {
private LanceNamespace namespace;
private List<String> tableId;
private String catalogName;
private boolean executorCredentialRefresh = DEFAULT_EXECUTOR_CREDENTIAL_REFRESH;

private Builder() {}

Expand Down Expand Up @@ -442,6 +498,11 @@ public Builder catalogName(String catalogName) {
return this;
}

public Builder executorCredentialRefresh(boolean executorCredentialRefresh) {
this.executorCredentialRefresh = executorCredentialRefresh;
return this;
}

/**
* Parses options from a map, extracting read-specific settings.
*
Expand All @@ -450,48 +511,64 @@ public Builder catalogName(String catalogName) {
*/
public Builder fromOptions(Map<String, String> options) {
this.storageOptions = new HashMap<>(options);
if (options.containsKey(CONFIG_PUSH_DOWN_FILTERS)) {
this.pushDownFilters = Boolean.parseBoolean(options.get(CONFIG_PUSH_DOWN_FILTERS));
}
if (options.containsKey(CONFIG_BLOCK_SIZE)) {
this.blockSize = Integer.parseInt(options.get(CONFIG_BLOCK_SIZE));
}
if (options.containsKey(CONFIG_VERSION)) {
this.version = Integer.parseInt(options.get(CONFIG_VERSION));
}
if (options.containsKey(CONFIG_INDEX_CACHE_SIZE)) {
this.indexCacheSize = Integer.parseInt(options.get(CONFIG_INDEX_CACHE_SIZE));
}
if (options.containsKey(CONFIG_METADATA_CACHE_SIZE)) {
this.metadataCacheSize = Integer.parseInt(options.get(CONFIG_METADATA_CACHE_SIZE));
}
if (options.containsKey(CONFIG_BATCH_SIZE)) {
int parsedBatchSize = Integer.parseInt(options.get(CONFIG_BATCH_SIZE));
Preconditions.checkArgument(parsedBatchSize > 0, "batch_size must be positive");
this.batchSize = parsedBatchSize;
}
if (options.containsKey(CONFIG_TOP_N_PUSH_DOWN)) {
this.topNPushDown = Boolean.parseBoolean(options.get(CONFIG_TOP_N_PUSH_DOWN));
}
if (options.containsKey(CONFIG_NEAREST)) {
String json = options.get(CONFIG_NEAREST);
nearest(json);
}
parseTypedFlags(options);
return this;
}

/**
* Merges catalog config options as defaults (read options override).
*
* <p>Also promotes recognized typed flags from the catalog config into their corresponding
* Builder fields so that catalog-level settings (e.g. {@code spark.sql.catalog.<name>.<key>})
* take effect on paths that do not later go through {@link #fromOptions(Map)} β€” notably SQL DML
* (DELETE / UPDATE / MERGE INTO) and plain SELECT without per-read {@code .option(...)}.
*
* @param catalogConfig the catalog config
* @return this builder
*/
public Builder withCatalogDefaults(LanceSparkCatalogConfig catalogConfig) {
// Merge storage options: catalog options are defaults, current options override
Map<String, String> merged = new HashMap<>(catalogConfig.getStorageOptions());
merged.putAll(this.storageOptions);
this.storageOptions = merged;
return this;
return fromOptions(merged);
}

/**
* Applies typed-flag parsing for every known read option present in {@code opts}. Shared by
* {@link #fromOptions(Map)} and {@link #withCatalogDefaults(LanceSparkCatalogConfig)} so that
* both call sites stay in sync and catalog-level configs reach the typed fields.
*/
private void parseTypedFlags(Map<String, String> opts) {
if (opts.containsKey(CONFIG_PUSH_DOWN_FILTERS)) {
this.pushDownFilters = Boolean.parseBoolean(opts.get(CONFIG_PUSH_DOWN_FILTERS));
}
if (opts.containsKey(CONFIG_BLOCK_SIZE)) {
this.blockSize = Integer.parseInt(opts.get(CONFIG_BLOCK_SIZE));
}
if (opts.containsKey(CONFIG_VERSION)) {
this.version = Integer.parseInt(opts.get(CONFIG_VERSION));
}
if (opts.containsKey(CONFIG_INDEX_CACHE_SIZE)) {
this.indexCacheSize = Integer.parseInt(opts.get(CONFIG_INDEX_CACHE_SIZE));
}
if (opts.containsKey(CONFIG_METADATA_CACHE_SIZE)) {
this.metadataCacheSize = Integer.parseInt(opts.get(CONFIG_METADATA_CACHE_SIZE));
}
if (opts.containsKey(CONFIG_BATCH_SIZE)) {
int parsedBatchSize = Integer.parseInt(opts.get(CONFIG_BATCH_SIZE));
Preconditions.checkArgument(parsedBatchSize > 0, "batch_size must be positive");
this.batchSize = parsedBatchSize;
}
if (opts.containsKey(CONFIG_TOP_N_PUSH_DOWN)) {
this.topNPushDown = Boolean.parseBoolean(opts.get(CONFIG_TOP_N_PUSH_DOWN));
}
if (opts.containsKey(CONFIG_NEAREST)) {
nearest(opts.get(CONFIG_NEAREST));
}
if (opts.containsKey(CONFIG_EXECUTOR_CREDENTIAL_REFRESH)) {
this.executorCredentialRefresh =
Boolean.parseBoolean(opts.get(CONFIG_EXECUTOR_CREDENTIAL_REFRESH));
}
}

public LanceSparkReadOptions build() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,18 @@ public static LanceFragmentScanner create(int fragmentId, LanceInputPartition in
Dataset dataset = null;
try {
LanceSparkReadOptions readOptions = inputPartition.getReadOptions();
if (inputPartition.getNamespaceImpl() != null) {
// Optionally rebuild the namespace client on the executor so the dataset open routes through
// Utils.OpenDatasetBuilder's namespaceClient branch. This preserves the storage options
// provider on the Rust side, which refreshes short-lived vended credentials (e.g. STS
// tokens) during long-running scans. The price is an eager describeTable() RPC against the
// namespace on every fragment open.
//
// For catalogs whose backing service authenticates per-call (e.g. Hive Metastore over
// Kerberos) executors typically lack a TGT and that RPC fails with "GSS initiate failed".
// Setting LanceSparkReadOptions.CONFIG_EXECUTOR_CREDENTIAL_REFRESH=false makes executors
// skip the rebuild and open the dataset by URI using the initialStorageOptions the driver
// already obtained, at the cost of losing the Rust-side credential refresh callback.
if (inputPartition.getNamespaceImpl() != null && readOptions.isExecutorCredentialRefresh()) {
if (LanceRuntime.useNamespaceOnWorkers(inputPartition.getNamespaceImpl())) {
readOptions.setNamespace(
LanceRuntime.getOrCreateNamespace(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

public class LanceSparkReadOptionsSerializationTest {

Expand Down Expand Up @@ -106,4 +109,121 @@ public void testUseIndexSerialization() throws IOException, ClassNotFoundExcepti
deserializedOptionsTrue.getNearest().isUseIndex(),
"useIndex should remain true after serialization/deserialization");
}

@Test
public void testExecutorCredentialRefreshDefaultsToTrue() {
LanceSparkReadOptions options =
LanceSparkReadOptions.builder().datasetUri("s3://bucket/path").build();
Assertions.assertTrue(
options.isExecutorCredentialRefresh(),
"executor_credential_refresh must default to true to preserve existing behavior");
}

@Test
public void testExecutorCredentialRefreshParsedFromOptions() {
LanceSparkReadOptions optionsFalse =
LanceSparkReadOptions.from(
Collections.singletonMap(
LanceSparkReadOptions.CONFIG_EXECUTOR_CREDENTIAL_REFRESH, "false"),
"s3://bucket/path");
Assertions.assertFalse(optionsFalse.isExecutorCredentialRefresh());

LanceSparkReadOptions optionsTrue =
LanceSparkReadOptions.from(
Collections.singletonMap(
LanceSparkReadOptions.CONFIG_EXECUTOR_CREDENTIAL_REFRESH, "true"),
"s3://bucket/path");
Assertions.assertTrue(optionsTrue.isExecutorCredentialRefresh());
}

@Test
public void testExecutorCredentialRefreshSurvivesSerialization()
throws IOException, ClassNotFoundException {
LanceSparkReadOptions options =
LanceSparkReadOptions.builder()
.datasetUri("s3://bucket/path")
.executorCredentialRefresh(false)
.build();
Assertions.assertFalse(options.isExecutorCredentialRefresh());

ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
oos.writeObject(options);
}
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
LanceSparkReadOptions deserialized;
try (ObjectInputStream ois = new ObjectInputStream(bais)) {
deserialized = (LanceSparkReadOptions) ois.readObject();
}

Assertions.assertFalse(
deserialized.isExecutorCredentialRefresh(),
"executor_credential_refresh must survive Java serialization (driver -> executor handoff)");
}

@Test
public void testExecutorCredentialRefreshPreservedByWithVersion() {
LanceSparkReadOptions options =
LanceSparkReadOptions.builder()
.datasetUri("s3://bucket/path")
.executorCredentialRefresh(false)
.build();

LanceSparkReadOptions pinned = options.withVersion(7);
Assertions.assertFalse(
pinned.isExecutorCredentialRefresh(),
"withVersion() must propagate the executor_credential_refresh flag");
}

/**
* Catalog-level config (set via {@code --conf spark.sql.catalog.<name>.<key>}) is the only route
* available to SQL DML (DELETE / UPDATE / MERGE INTO), which has no per-statement {@code
* .option(...)} attach point. This test guards the catalog-conf path.
*/
@Test
public void testExecutorCredentialRefreshFromCatalogDefaults() {
Map<String, String> catalogOpts = new HashMap<>();
catalogOpts.put(LanceSparkReadOptions.CONFIG_EXECUTOR_CREDENTIAL_REFRESH, "false");
LanceSparkCatalogConfig catalogConfig = LanceSparkCatalogConfig.from(catalogOpts);

LanceSparkReadOptions options =
LanceSparkReadOptions.builder()
.datasetUri("s3://bucket/path")
.withCatalogDefaults(catalogConfig)
.build();

Assertions.assertFalse(
options.isExecutorCredentialRefresh(),
"executor_credential_refresh set at catalog level must land in the typed field "
+ "so it takes effect for SELECT without .option(...) and for SQL DML");
}

/**
* Spark's scan-time options (via {@code spark.read.option(...)}) go through a second {@code
* fromOptions(mergedMap)} rebuild in {@code LanceDataset.newScanBuilder}. Per-read settings must
* win over catalog-level defaults.
*/
@Test
public void testPerReadOptionOverridesCatalogDefaults() {
Map<String, String> catalogOpts = new HashMap<>();
catalogOpts.put(LanceSparkReadOptions.CONFIG_EXECUTOR_CREDENTIAL_REFRESH, "false");
LanceSparkCatalogConfig catalogConfig = LanceSparkCatalogConfig.from(catalogOpts);

// Simulate the rebuild path in LanceDataset.newScanBuilder: the builder starts by applying
// the catalog defaults, then fromOptions() replays against the merged (catalog + per-read)
// map where the per-read value wins.
Map<String, String> merged = new HashMap<>(catalogConfig.getStorageOptions());
merged.put(LanceSparkReadOptions.CONFIG_EXECUTOR_CREDENTIAL_REFRESH, "true");

LanceSparkReadOptions options =
LanceSparkReadOptions.builder()
.datasetUri("s3://bucket/path")
.withCatalogDefaults(catalogConfig)
.fromOptions(merged)
.build();

Assertions.assertTrue(
options.isExecutorCredentialRefresh(),
"per-read .option(...) must override the catalog-level default");
}
}
Loading