Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions python/python/benchmarks/test_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,49 @@ def test_ann_with_refine(test_dataset, benchmark):
assert result.num_rows > 0


N_BATCH_QUERIES = 32


@pytest.mark.benchmark(group="query_ann_batch")
def test_batch_ann_search(test_dataset, benchmark):
# One request carrying all query vectors: the index shares each partition's
# scan across the batch (issue #6822).
queries = np.random.randn(N_BATCH_QUERIES, N_DIMS).astype(np.float32)
result = benchmark(
test_dataset.to_table,
columns=[],
with_row_id=True,
nearest=dict(
column="vector",
q=queries,
k=100,
nprobes=10,
),
)
assert result.num_rows > 0


@pytest.mark.benchmark(group="query_ann_batch")
def test_repeated_single_ann_search(test_dataset, benchmark):
# Baseline: the same query vectors issued one indexed search at a time.
queries = np.random.randn(N_BATCH_QUERIES, N_DIMS).astype(np.float32)

def run():
for q in queries:
test_dataset.to_table(
columns=[],
with_row_id=True,
nearest=dict(
column="vector",
q=q,
k=100,
nprobes=10,
),
)

benchmark(run)


@pytest.mark.benchmark(group="query_ann")
@pytest.mark.parametrize("selectivity", (0.25, 0.75))
@pytest.mark.parametrize("prefilter", (False, True))
Expand Down
33 changes: 33 additions & 0 deletions python/python/tests/test_vector_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,39 @@ def test_batch_flat_query_matches_repeated_single_queries(dataset, queries):
)


@pytest.mark.parametrize(
"queries",
[
np.random.randn(3, 128).astype(np.float32),
np.random.randn(1, 128).astype(np.float32),
],
ids=["three_queries", "single_query"],
)
def test_batch_indexed_query_matches_repeated_single_queries(indexed_dataset, queries):
k = 5
query_count = queries.shape[0]

# nprobes covers every partition so the shared-scan batch path and the
# repeated single-query path search the same partitions deterministically.
nearest_kwargs = {"use_index": True, "nprobes": 4}
batch = indexed_dataset.to_table(
columns=["id"],
nearest={"column": "vector", "q": queries, "k": k, **nearest_kwargs},
)

assert batch.column_names == ["query_index", "id", "_distance"]
assert batch["query_index"].to_pylist() == sum(
[[i] * k for i in range(query_count)], []
)

_assert_batch_matches_single_queries(
indexed_dataset,
queries,
k=k,
nearest_kwargs=nearest_kwargs,
)


def _assert_batch_matches_single_queries(ds, queries, k, nearest_kwargs):
batch = ds.to_table(
columns=["id"],
Expand Down
48 changes: 48 additions & 0 deletions rust/lance-index/src/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,54 @@ pub trait VectorIndex: Send + Sync + std::fmt::Debug + Index {
)))
}

/// Whether this index can search multiple query vectors in a single pass
/// via [`VectorIndex::search_partitions_batch`], reading each partition's
/// storage once and scoring every query that probes it.
///
/// Defaults to `false`; the batch scan planner falls back to repeated
/// single-query search for indices that return `false`.
fn supports_batch_partition_search(&self) -> bool {
false
}

/// Search a batch of query vectors against a shared set of partitions.
///
/// `query.key` holds all query vectors concatenated (length
/// `query_count * dim`, where `query_count == partitions_per_query.len()`).
/// `partitions_per_query[i]` / `q_c_dists_per_query[i]` are the ranked
/// partition ids and query-to-centroid distances for query `i`.
///
/// Returns one [RecordBatch] per query (in query order) with the
/// [`VECTOR_RESULT_SCHEMA`] (`_distance`, `_rowid`) and at most `query.k`
/// rows each. Implementations should read each distinct partition's storage
/// only once and score every query assigned to it against the loaded data.
///
/// The default implementation returns an error; callers must gate on
/// [`VectorIndex::supports_batch_partition_search`].
#[allow(clippy::too_many_arguments)]
async fn search_partitions_batch(
self: Arc<Self>,
query: Query,
partitions_per_query: Vec<Arc<UInt32Array>>,
q_c_dists_per_query: Vec<Arc<Float32Array>>,
pre_filter: Arc<dyn PreFilter>,
metrics: Arc<dyn MetricsCollector>,
) -> Result<Vec<RecordBatch>>
where
Self: 'static,
{
let _ = (
query,
partitions_per_query,
q_c_dists_per_query,
pre_filter,
metrics,
);
Err(Error::not_supported(
"batch partition search is not supported for this index",
))
}

/// If the index is loadable by IVF, so it can be a sub-index that
/// is loaded on demand by IVF.
fn is_loadable(&self) -> bool;
Expand Down
129 changes: 125 additions & 4 deletions rust/lance/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ use crate::io::exec::{
AddRowAddrExec, FilterPlan as ExprFilterPlan, KNNVectorDistanceExec, LancePushdownScanExec,
LanceScanExec, Planner, PreFilterSource, ScanConfig, TakeExec,
knn::{
KnnBatchParams, QUERY_INDEX_COL, knn_empty_result_schema, new_knn_exec, query_index_field,
KnnBatchParams, QUERY_INDEX_COL, knn_empty_result_schema, new_knn_batch_exec, new_knn_exec,
query_index_field,
},
project,
};
Expand Down Expand Up @@ -3772,7 +3773,15 @@ impl Scanner {

if let Some((index_name, index_segments, index_metric)) = index_and_segments {
if self.is_batch_nearest {
return self.batch_indexed_vector_search(filter_plan, &q).await;
return self
.batch_indexed_vector_search(
filter_plan,
&q,
&index_name,
&index_segments,
index_metric,
)
.await;
}

log::trace!("index found for vector search");
Expand Down Expand Up @@ -3854,11 +3863,78 @@ impl Scanner {
}
}

/// Whether a batch (multi-query) vector search can use the shared-scan
/// indexed fast path ([`new_knn_batch_exec`]) instead of running one indexed
/// search per query vector.
///
/// Requires: no refine step, every segment an IVF index with a flat-style
/// sub-index (i.e. not HNSW), and all target fragments indexed (or
/// `fast_search`, which ignores unindexed fragments).
async fn batch_index_search_supported(
&self,
index_name: &str,
index_segments: &[IndexMetadata],
q: &Query,
) -> Result<bool> {
if matches!(q.refine_factor, Some(rf) if rf > 1) {
return Ok(false);
}
let all_ivf_flat_style = index_segments.iter().all(|index| {
index
.index_details
.as_ref()
.filter(|details| !details.value.is_empty())
.map(|details| {
let index_type =
crate::index::vector::details::derive_vector_index_type(details);
index_type.starts_with("IVF_") && !index_type.contains("HNSW")
})
.unwrap_or(false)
});
if !all_ivf_flat_style {
return Ok(false);
}
if self.fast_search {
return Ok(true);
}
// The batch node only searches indexed partitions, so any unindexed
// target fragment would silently drop rows; fall back in that case.
let unindexed_fragments =
self.retain_target_fragments(self.dataset.unindexed_fragments(index_name).await?);
Ok(unindexed_fragments.is_empty())
}

async fn batch_indexed_vector_search(
&self,
filter_plan: &ExprFilterPlan,
q: &Query,
index_name: &str,
index_segments: &[IndexMetadata],
index_metric: MetricType,
) -> Result<Arc<dyn ExecutionPlan>> {
// Fast path: when every index segment is an IVF index with a flat-style
// sub-index (IVF_FLAT/PQ/SQ/RQ), search all query vectors in a single
// pass that reads each partition's storage once and shares the prefilter
// across the batch. HNSW, refine, and mixed indexed/unindexed scans fall
// back to the per-query loop below, which never regresses behavior.
if self
.batch_index_search_supported(index_name, index_segments, q)
.await?
{
let mut batch_query = q.clone();
batch_query.metric_type = Some(index_metric);
let prefilter_source = self
.prefilter_source(filter_plan, self.get_indexed_frags(index_segments))
.await?;
return new_knn_batch_exec(
self.dataset.clone(),
index_segments,
&batch_query,
self.nearest_query_count,
prefilter_source,
);
}

let query_dim = q.key.len() / self.nearest_query_count;
let mut query_plans = Vec::with_capacity(self.nearest_query_count);

Expand Down Expand Up @@ -6221,8 +6297,13 @@ mod test {

let plan = scan.explain_plan(false).await.unwrap();
assert!(
plan.contains("ANNSubIndex"),
"batch KNN should use the vector index when available, got:\n{}",
plan.contains("ANNIvfBatch"),
"IVF batch KNN should use the shared-scan batch node, got:\n{}",
plan
);
assert!(
!plan.contains("ANNSubIndex"),
"IVF batch KNN should not fall back to per-query ANN search, got:\n{}",
plan
);
assert!(
Expand All @@ -6237,6 +6318,9 @@ mod test {
batch[QUERY_INDEX_COL].as_primitive::<Int32Type>().values(),
&[0, 0, 1, 1]
);
// Shared-scan batch search must return the same rows/distances as
// issuing the queries one at a time against the index.
assert_batch_matches_single_queries(dataset, &batch, &query_values, 2, true, None).await;

let batch = dataset
.scan()
Expand All @@ -6259,6 +6343,43 @@ mod test {
.await;
}

/// `refine_factor` is not yet supported by the shared-scan batch path, so
/// the scanner must fall back to the per-query indexed loop and still
/// produce correctly grouped per-query results.
#[tokio::test]
async fn test_batch_knn_indexed_refine_falls_back() {
let mut test_ds = TestVectorDataset::new(LanceFileVersion::Stable, true)
.await
.unwrap();
test_ds.make_vector_index().await.unwrap();
let dataset = &test_ds.dataset;
let (queries, _query_values) = batch_knn_two_queries();

let mut scan = dataset.scan();
scan.nearest("vec", &queries, 2).unwrap();
scan.refine(2);
scan.project(&["i"]).unwrap();

let plan = scan.explain_plan(false).await.unwrap();
assert!(
!plan.contains("ANNIvfBatch"),
"refine must not use the shared-scan batch node, got:\n{}",
plan
);
assert!(
plan.contains("ANNSubIndex"),
"refine batch search should fall back to the per-query indexed loop, got:\n{}",
plan
);

let batch = scan.try_into_batch().await.unwrap();
assert_query_index_field(&batch);
assert_eq!(
batch[QUERY_INDEX_COL].as_primitive::<Int32Type>().values(),
&[0, 0, 1, 1]
);
}

#[tokio::test]
async fn test_can_project_distance() {
let test_ds = TestVectorDataset::new(LanceFileVersion::Stable, true)
Expand Down
Loading
Loading