diff --git a/python/python/benchmarks/test_search.py b/python/python/benchmarks/test_search.py index 61076e61687..70b6786d4ed 100644 --- a/python/python/benchmarks/test_search.py +++ b/python/python/benchmarks/test_search.py @@ -208,6 +208,49 @@ def test_ann_with_refine(test_dataset, benchmark): assert result.num_rows > 0 +N_BATCH_QUERIES = 32 + + +@pytest.mark.benchmark(group="query_ann_batch") +def test_batch_ann_search(test_dataset, benchmark): + # One request carrying all query vectors: the index shares each partition's + # scan across the batch (issue #6822). + queries = np.random.randn(N_BATCH_QUERIES, N_DIMS).astype(np.float32) + result = benchmark( + test_dataset.to_table, + columns=[], + with_row_id=True, + nearest=dict( + column="vector", + q=queries, + k=100, + nprobes=10, + ), + ) + assert result.num_rows > 0 + + +@pytest.mark.benchmark(group="query_ann_batch") +def test_repeated_single_ann_search(test_dataset, benchmark): + # Baseline: the same query vectors issued one indexed search at a time. + queries = np.random.randn(N_BATCH_QUERIES, N_DIMS).astype(np.float32) + + def run(): + for q in queries: + test_dataset.to_table( + columns=[], + with_row_id=True, + nearest=dict( + column="vector", + q=q, + k=100, + nprobes=10, + ), + ) + + benchmark(run) + + @pytest.mark.benchmark(group="query_ann") @pytest.mark.parametrize("selectivity", (0.25, 0.75)) @pytest.mark.parametrize("prefilter", (False, True)) diff --git a/python/python/tests/test_vector_index.py b/python/python/tests/test_vector_index.py index 292b8079706..fe44fd5b2d1 100644 --- a/python/python/tests/test_vector_index.py +++ b/python/python/tests/test_vector_index.py @@ -222,6 +222,39 @@ def test_batch_flat_query_matches_repeated_single_queries(dataset, queries): ) +@pytest.mark.parametrize( + "queries", + [ + np.random.randn(3, 128).astype(np.float32), + np.random.randn(1, 128).astype(np.float32), + ], + ids=["three_queries", "single_query"], +) +def test_batch_indexed_query_matches_repeated_single_queries(indexed_dataset, queries): + k = 5 + query_count = queries.shape[0] + + # nprobes covers every partition so the shared-scan batch path and the + # repeated single-query path search the same partitions deterministically. + nearest_kwargs = {"use_index": True, "nprobes": 4} + batch = indexed_dataset.to_table( + columns=["id"], + nearest={"column": "vector", "q": queries, "k": k, **nearest_kwargs}, + ) + + assert batch.column_names == ["query_index", "id", "_distance"] + assert batch["query_index"].to_pylist() == sum( + [[i] * k for i in range(query_count)], [] + ) + + _assert_batch_matches_single_queries( + indexed_dataset, + queries, + k=k, + nearest_kwargs=nearest_kwargs, + ) + + def _assert_batch_matches_single_queries(ds, queries, k, nearest_kwargs): batch = ds.to_table( columns=["id"], diff --git a/rust/lance-index/src/vector.rs b/rust/lance-index/src/vector.rs index 3c5a6601a8a..f2018a410ee 100644 --- a/rust/lance-index/src/vector.rs +++ b/rust/lance-index/src/vector.rs @@ -353,6 +353,54 @@ pub trait VectorIndex: Send + Sync + std::fmt::Debug + Index { ))) } + /// Whether this index can search multiple query vectors in a single pass + /// via [`VectorIndex::search_partitions_batch`], reading each partition's + /// storage once and scoring every query that probes it. + /// + /// Defaults to `false`; the batch scan planner falls back to repeated + /// single-query search for indices that return `false`. + fn supports_batch_partition_search(&self) -> bool { + false + } + + /// Search a batch of query vectors against a shared set of partitions. + /// + /// `query.key` holds all query vectors concatenated (length + /// `query_count * dim`, where `query_count == partitions_per_query.len()`). + /// `partitions_per_query[i]` / `q_c_dists_per_query[i]` are the ranked + /// partition ids and query-to-centroid distances for query `i`. + /// + /// Returns one [RecordBatch] per query (in query order) with the + /// [`VECTOR_RESULT_SCHEMA`] (`_distance`, `_rowid`) and at most `query.k` + /// rows each. Implementations should read each distinct partition's storage + /// only once and score every query assigned to it against the loaded data. + /// + /// The default implementation returns an error; callers must gate on + /// [`VectorIndex::supports_batch_partition_search`]. + #[allow(clippy::too_many_arguments)] + async fn search_partitions_batch( + self: Arc, + query: Query, + partitions_per_query: Vec>, + q_c_dists_per_query: Vec>, + pre_filter: Arc, + metrics: Arc, + ) -> Result> + where + Self: 'static, + { + let _ = ( + query, + partitions_per_query, + q_c_dists_per_query, + pre_filter, + metrics, + ); + Err(Error::not_supported( + "batch partition search is not supported for this index", + )) + } + /// If the index is loadable by IVF, so it can be a sub-index that /// is loaded on demand by IVF. fn is_loadable(&self) -> bool; diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index 1112721bb33..841642156cf 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -101,7 +101,8 @@ use crate::io::exec::{ AddRowAddrExec, FilterPlan as ExprFilterPlan, KNNVectorDistanceExec, LancePushdownScanExec, LanceScanExec, Planner, PreFilterSource, ScanConfig, TakeExec, knn::{ - KnnBatchParams, QUERY_INDEX_COL, knn_empty_result_schema, new_knn_exec, query_index_field, + KnnBatchParams, QUERY_INDEX_COL, knn_empty_result_schema, new_knn_batch_exec, new_knn_exec, + query_index_field, }, project, }; @@ -3772,7 +3773,15 @@ impl Scanner { if let Some((index_name, index_segments, index_metric)) = index_and_segments { if self.is_batch_nearest { - return self.batch_indexed_vector_search(filter_plan, &q).await; + return self + .batch_indexed_vector_search( + filter_plan, + &q, + &index_name, + &index_segments, + index_metric, + ) + .await; } log::trace!("index found for vector search"); @@ -3854,11 +3863,78 @@ impl Scanner { } } + /// Whether a batch (multi-query) vector search can use the shared-scan + /// indexed fast path ([`new_knn_batch_exec`]) instead of running one indexed + /// search per query vector. + /// + /// Requires: no refine step, every segment an IVF index with a flat-style + /// sub-index (i.e. not HNSW), and all target fragments indexed (or + /// `fast_search`, which ignores unindexed fragments). + async fn batch_index_search_supported( + &self, + index_name: &str, + index_segments: &[IndexMetadata], + q: &Query, + ) -> Result { + if matches!(q.refine_factor, Some(rf) if rf > 1) { + return Ok(false); + } + let all_ivf_flat_style = index_segments.iter().all(|index| { + index + .index_details + .as_ref() + .filter(|details| !details.value.is_empty()) + .map(|details| { + let index_type = + crate::index::vector::details::derive_vector_index_type(details); + index_type.starts_with("IVF_") && !index_type.contains("HNSW") + }) + .unwrap_or(false) + }); + if !all_ivf_flat_style { + return Ok(false); + } + if self.fast_search { + return Ok(true); + } + // The batch node only searches indexed partitions, so any unindexed + // target fragment would silently drop rows; fall back in that case. + let unindexed_fragments = + self.retain_target_fragments(self.dataset.unindexed_fragments(index_name).await?); + Ok(unindexed_fragments.is_empty()) + } + async fn batch_indexed_vector_search( &self, filter_plan: &ExprFilterPlan, q: &Query, + index_name: &str, + index_segments: &[IndexMetadata], + index_metric: MetricType, ) -> Result> { + // Fast path: when every index segment is an IVF index with a flat-style + // sub-index (IVF_FLAT/PQ/SQ/RQ), search all query vectors in a single + // pass that reads each partition's storage once and shares the prefilter + // across the batch. HNSW, refine, and mixed indexed/unindexed scans fall + // back to the per-query loop below, which never regresses behavior. + if self + .batch_index_search_supported(index_name, index_segments, q) + .await? + { + let mut batch_query = q.clone(); + batch_query.metric_type = Some(index_metric); + let prefilter_source = self + .prefilter_source(filter_plan, self.get_indexed_frags(index_segments)) + .await?; + return new_knn_batch_exec( + self.dataset.clone(), + index_segments, + &batch_query, + self.nearest_query_count, + prefilter_source, + ); + } + let query_dim = q.key.len() / self.nearest_query_count; let mut query_plans = Vec::with_capacity(self.nearest_query_count); @@ -6221,8 +6297,13 @@ mod test { let plan = scan.explain_plan(false).await.unwrap(); assert!( - plan.contains("ANNSubIndex"), - "batch KNN should use the vector index when available, got:\n{}", + plan.contains("ANNIvfBatch"), + "IVF batch KNN should use the shared-scan batch node, got:\n{}", + plan + ); + assert!( + !plan.contains("ANNSubIndex"), + "IVF batch KNN should not fall back to per-query ANN search, got:\n{}", plan ); assert!( @@ -6237,6 +6318,9 @@ mod test { batch[QUERY_INDEX_COL].as_primitive::().values(), &[0, 0, 1, 1] ); + // Shared-scan batch search must return the same rows/distances as + // issuing the queries one at a time against the index. + assert_batch_matches_single_queries(dataset, &batch, &query_values, 2, true, None).await; let batch = dataset .scan() @@ -6259,6 +6343,43 @@ mod test { .await; } + /// `refine_factor` is not yet supported by the shared-scan batch path, so + /// the scanner must fall back to the per-query indexed loop and still + /// produce correctly grouped per-query results. + #[tokio::test] + async fn test_batch_knn_indexed_refine_falls_back() { + let mut test_ds = TestVectorDataset::new(LanceFileVersion::Stable, true) + .await + .unwrap(); + test_ds.make_vector_index().await.unwrap(); + let dataset = &test_ds.dataset; + let (queries, _query_values) = batch_knn_two_queries(); + + let mut scan = dataset.scan(); + scan.nearest("vec", &queries, 2).unwrap(); + scan.refine(2); + scan.project(&["i"]).unwrap(); + + let plan = scan.explain_plan(false).await.unwrap(); + assert!( + !plan.contains("ANNIvfBatch"), + "refine must not use the shared-scan batch node, got:\n{}", + plan + ); + assert!( + plan.contains("ANNSubIndex"), + "refine batch search should fall back to the per-query indexed loop, got:\n{}", + plan + ); + + let batch = scan.try_into_batch().await.unwrap(); + assert_query_index_field(&batch); + assert_eq!( + batch[QUERY_INDEX_COL].as_primitive::().values(), + &[0, 0, 1, 1] + ); + } + #[tokio::test] async fn test_can_project_distance() { let test_ds = TestVectorDataset::new(LanceFileVersion::Stable, true) diff --git a/rust/lance/src/index/vector/ivf/v2.rs b/rust/lance/src/index/vector/ivf/v2.rs index 202a4423d49..41c2fe42453 100644 --- a/rust/lance/src/index/vector/ivf/v2.rs +++ b/rust/lance/src/index/vector/ivf/v2.rs @@ -1753,6 +1753,147 @@ impl VectorIndex for IVFInd ))) } + fn supports_batch_partition_search(&self) -> bool { + S::supports_global_topk_heap() + } + + async fn search_partitions_batch( + self: Arc, + query: Query, + partitions_per_query: Vec>, + q_c_dists_per_query: Vec>, + pre_filter: Arc, + metrics: Arc, + ) -> Result> { + if !S::supports_global_topk_heap() { + return Err(Error::not_supported( + "batch partition search requires a global top-k heap sub-index", + )); + } + let query_count = partitions_per_query.len(); + if q_c_dists_per_query.len() != query_count { + return Err(Error::invalid_input(format!( + "batch partition search: {query_count} query partition lists but {} distance lists", + q_c_dists_per_query.len() + ))); + } + if query_count == 0 { + return Ok(Vec::new()); + } + if !query.key.len().is_multiple_of(query_count) { + return Err(Error::invalid_input(format!( + "batch partition search: query key length {} is not divisible by query count {query_count}", + query.key.len() + ))); + } + let dim = query.key.len() / query_count; + + // Per-query immutable search state: the query vector slice and the + // optional Rabit raw-query context both depend only on the query vector, + // so compute them once up front rather than per probed partition. + let mut base_queries = Vec::with_capacity(query_count); + let mut raw_query_contexts = Vec::with_capacity(query_count); + for query_index in 0..query_count { + if partitions_per_query[query_index].len() != q_c_dists_per_query[query_index].len() { + return Err(Error::invalid_input(format!( + "batch partition search: query {query_index} has {} partitions but {} distances", + partitions_per_query[query_index].len(), + q_c_dists_per_query[query_index].len() + ))); + } + let mut single_query = query.clone(); + single_query.key = query.key.slice(query_index * dim, dim); + raw_query_contexts.push(self.prepare_rq_raw_query_context(&single_query.key)?); + base_queries.push(single_query); + } + + // Invert the per-query partition lists so each distinct partition is + // loaded once and scored against every query that probes it. + let mut assignments: HashMap> = HashMap::new(); + for (query_index, (parts, dists)) in partitions_per_query + .iter() + .zip(q_c_dists_per_query.iter()) + .enumerate() + { + for (part_id, dist_q_c) in parts.values().iter().zip(dists.values().iter()) { + assignments + .entry(*part_id) + .or_default() + .push((query_index, *dist_q_c)); + } + } + + pre_filter.wait_for_ready().await?; + + // Load each distinct partition's storage exactly once. This shared I/O + // is the whole point of batch search versus repeated single queries. + let load_parallelism = get_num_compute_intensive_cpus().max(1); + let load_index = self.clone(); + let load_metrics = metrics.clone(); + let loaded = stream::iter(assignments.into_iter()) + .map(move |(part_id, probing_queries)| { + let index = load_index.clone(); + let metrics = load_metrics.clone(); + async move { + let part_entry = index + .load_partition(part_id as usize, true, metrics.as_ref()) + .await?; + Result::Ok((part_id as usize, part_entry, probing_queries)) + } + }) + .buffered(load_parallelism) + .try_collect::>() + .await?; + + // Score the loaded partitions into one top-k heap per query. + let use_query_residual = self.use_query_residual; + let use_residual_scratch = self.use_residual_scratch; + let heap_capacity = query.k * query.refine_factor.unwrap_or(1) as usize; + let scratch_pool = self.scratch_pool.clone(); + let index = self.clone(); + let search_metrics = metrics.clone(); + let batches = spawn_cpu(move || -> Result> { + let mut heaps: Vec>> = (0..query_count) + .map(|_| BinaryHeap::with_capacity(heap_capacity)) + .collect(); + scratch_pool.with_scratch(|scratch| -> Result<()> { + for (part_id, part_entry, probing_queries) in &loaded { + let partition_centroid = index.ivf.centroid(*part_id); + for (query_index, dist_q_c) in probing_queries { + let mut single_query = base_queries[*query_index].clone(); + single_query.dist_q_c = *dist_q_c; + let prepared = PreparedPartitionSearch:: { + query: single_query, + pre_filter: pre_filter.clone(), + partition_id: *part_id, + partition_centroid: partition_centroid.clone(), + rq_search_cache: index.rq_search_cache.clone(), + raw_query_context: raw_query_contexts[*query_index].clone(), + part_entry: part_entry.clone(), + _marker: PhantomData, + }; + Self::accumulate_prepared_partition_search( + use_query_residual, + use_residual_scratch, + prepared, + &mut heaps[*query_index], + scratch, + search_metrics.as_ref(), + )?; + } + } + Ok(()) + })?; + heaps + .into_iter() + .map(Self::global_heap_to_batch) + .collect::>>() + }) + .await?; + + Ok(batches) + } + fn is_loadable(&self) -> bool { false } diff --git a/rust/lance/src/io/exec/knn.rs b/rust/lance/src/io/exec/knn.rs index 0ceddf7c5ee..9319617a4da 100644 --- a/rust/lance/src/io/exec/knn.rs +++ b/rust/lance/src/io/exec/knn.rs @@ -8,7 +8,7 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, LazyLock, Mutex}; use std::time::Instant; -use arrow::array::{Float32Builder, Int32Builder}; +use arrow::array::{Float32Builder, Int32Builder, UInt64Builder}; use arrow::datatypes::{Float32Type, UInt32Type, UInt64Type}; use arrow_array::{Array, Float32Array, UInt32Array, UInt64Array}; use arrow_array::{ @@ -1675,6 +1675,316 @@ impl ExecutionPlan for ANNIvfSubIndexExec { } } +/// Build a batch (multi-query) indexed vector search plan. +/// +/// `query.key` must hold all `query_count` query vectors concatenated. +pub fn new_knn_batch_exec( + dataset: Arc, + indices: &[IndexMetadata], + query: &Query, + query_count: usize, + prefilter_source: PreFilterSource, +) -> Result> { + Ok(Arc::new(ANNIvfBatchExec::try_new( + dataset, + indices.to_vec(), + query.clone(), + query_count, + prefilter_source, + )?)) +} + +/// [ExecutionPlan] for batch (multi-query) IVF vector search. +/// +/// Where the single-query path uses [`ANNIvfPartitionExec`] + +/// [`ANNIvfSubIndexExec`], this node ranks every query vector against the IVF +/// centroids and then asks the index to read each probed partition's storage +/// once, scoring all queries that probe it +/// (via [`VectorIndex::search_partitions_batch`]). The prefilter is built once +/// and shared across all queries. +/// +/// Output schema: `{query_index: Int32, _distance: Float32, _rowid: UInt64}`, +/// sorted by `(query_index, _distance, _rowid)`, with up to `k` rows per query. +/// +/// Per-query nprobes are honored statically from the ranking; the adaptive +/// late-search expansion used by the single-query path is not applied, so recall +/// matches repeated single-query search when `minimum_nprobes == maximum_nprobes`. +#[derive(Debug)] +pub struct ANNIvfBatchExec { + dataset: Arc, + indices: Vec, + /// Vector query whose `key` holds all `query_count` vectors concatenated. + query: Query, + query_count: usize, + prefilter_source: PreFilterSource, + properties: Arc, + metrics: ExecutionPlanMetricsSet, +} + +impl ANNIvfBatchExec { + pub fn try_new( + dataset: Arc, + indices: Vec, + query: Query, + query_count: usize, + prefilter_source: PreFilterSource, + ) -> Result { + if indices.is_empty() { + return Err(Error::index( + "ANNIvfBatchExec: no index found for query".to_string(), + )); + } + if query_count == 0 || !query.key.len().is_multiple_of(query_count) { + return Err(Error::invalid_input(format!( + "ANNIvfBatchExec: query key length {} is not divisible by query count {query_count}", + query.key.len() + ))); + } + let properties = Arc::new(PlanProperties::new( + EquivalenceProperties::new(knn_empty_result_schema(true)), + Partitioning::RoundRobinBatch(1), + EmissionType::Final, + Boundedness::Bounded, + )); + Ok(Self { + dataset, + indices, + query, + query_count, + prefilter_source, + properties, + metrics: ExecutionPlanMetricsSet::new(), + }) + } +} + +impl DisplayAs for ANNIvfBatchExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!( + f, + "ANNIvfBatch: query_count={}, k={}, deltas={}", + self.query_count, + self.query.k, + self.indices.len() + ) + } + DisplayFormatType::TreeRender => { + write!( + f, + "ANNIvfBatch\nquery_count={}\nk={}\ndeltas={}", + self.query_count, + self.query.k, + self.indices.len() + ) + } + } + } +} + +impl ExecutionPlan for ANNIvfBatchExec { + fn name(&self) -> &str { + "ANNIvfBatchExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + knn_empty_result_schema(true) + } + + fn properties(&self) -> &Arc { + &self.properties + } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } + + fn children(&self) -> Vec<&Arc> { + match &self.prefilter_source { + PreFilterSource::None => vec![], + PreFilterSource::FilteredRowIds(src) => vec![src], + PreFilterSource::ScalarIndexQuery(src) => vec![src], + } + } + + fn required_input_distribution(&self) -> Vec { + self.children() + .iter() + .map(|_| Distribution::SinglePartition) + .collect() + } + + fn with_new_children( + self: Arc, + mut children: Vec>, + ) -> DataFusionResult> { + let prefilter_source = match (&self.prefilter_source, children.len()) { + (PreFilterSource::None, 0) => PreFilterSource::None, + (PreFilterSource::FilteredRowIds(_), 1) => { + PreFilterSource::FilteredRowIds(children.pop().expect("length checked")) + } + (PreFilterSource::ScalarIndexQuery(_), 1) => { + PreFilterSource::ScalarIndexQuery(children.pop().expect("length checked")) + } + _ => { + return Err(DataFusionError::Internal( + "ANNIvfBatchExec given an unexpected number of children".to_string(), + )); + } + }; + Ok(Arc::new(Self { + dataset: self.dataset.clone(), + indices: self.indices.clone(), + query: self.query.clone(), + query_count: self.query_count, + prefilter_source, + properties: self.properties.clone(), + metrics: ExecutionPlanMetricsSet::new(), + })) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> DataFusionResult { + let schema = self.schema(); + let ds = self.dataset.clone(); + let column = self.query.column.clone(); + let indices = self.indices.clone(); + let query = self.query.clone(); + let query_count = self.query_count; + let metrics = Arc::new(AnnIndexMetrics::new(&self.metrics, partition)); + let metrics_clone = metrics.clone(); + let timer = Instant::now(); + + let prefilter_loader = match &self.prefilter_source { + PreFilterSource::FilteredRowIds(src_node) => { + let stream = src_node.execute(partition, context)?; + Some(Box::new(FilteredRowIdsToPrefilter(stream)) as Box) + } + PreFilterSource::ScalarIndexQuery(src_node) => { + let stream = src_node.execute(partition, context)?; + Some(Box::new(SelectionVectorToPrefilter(stream)) as Box) + } + PreFilterSource::None => None, + }; + let pre_filter = Arc::new(DatasetPreFilter::new( + ds.clone(), + &indices, + prefilter_loader, + )); + + let result_schema = schema.clone(); + let fut = async move { + let dim = query.key.len() / query_count; + // Per-query candidate (distance, row_id) pairs accumulated across deltas. + let mut candidates: Vec> = vec![Vec::new(); query_count]; + + for index_meta in &indices { + let index = ds + .open_vector_index(&column, &index_meta.uuid, &metrics.index_metrics) + .await?; + if !index.supports_batch_partition_search() { + return Err(DataFusionError::Execution( + "ANNIvfBatchExec: index does not support batch partition search" + .to_string(), + )); + } + // Normalize every query vector once (cosine only) before ranking. + let normalized = normalize_query_for_index(index.as_ref(), query.clone())?; + + let mut partitions_per_query = Vec::with_capacity(query_count); + let mut dists_per_query = Vec::with_capacity(query_count); + for query_index in 0..query_count { + let mut single_query = normalized.clone(); + single_query.key = normalized.key.slice(query_index * dim, dim); + // Rank a fixed set of partitions per query (no adaptive + // expansion). This mirrors the single-query path's early + // search, which always probes at least `minimum_nprobes` + // partitions and stops there once k results are found. + let nprobes = single_query.minimum_nprobes.max(1); + single_query.maximum_nprobes = Some(nprobes); + let (partitions, q_c_dists) = index.find_partitions(&single_query)?; + partitions_per_query.push(Arc::new(partitions)); + dists_per_query.push(Arc::new(q_c_dists)); + } + + let index_metrics: Arc = + Arc::new(metrics.index_metrics.clone()); + let pre_filter: Arc = pre_filter.clone(); + let per_query = index + .search_partitions_batch( + normalized, + partitions_per_query, + dists_per_query, + pre_filter, + index_metrics, + ) + .await?; + + for (query_index, batch) in per_query.into_iter().enumerate() { + let dists = batch.column(0).as_primitive::(); + let row_ids = batch.column(1).as_primitive::(); + candidates[query_index].extend( + dists + .values() + .iter() + .copied() + .zip(row_ids.values().iter().copied()), + ); + } + } + + // Per-query top-k merge across deltas, tagged with query_index. + let mut query_index_builder = Int32Builder::new(); + let mut distance_builder = Float32Builder::new(); + let mut row_id_builder = UInt64Builder::new(); + for (query_index, cands) in candidates.iter_mut().enumerate() { + cands.sort_by(|a, b| a.0.total_cmp(&b.0).then_with(|| a.1.cmp(&b.1))); + cands.truncate(query.k); + for (distance, row_id) in cands.iter() { + query_index_builder.append_value(query_index as i32); + distance_builder.append_value(*distance); + row_id_builder.append_value(*row_id); + } + } + let batch = RecordBatch::try_new( + result_schema, + vec![ + Arc::new(query_index_builder.finish()), + Arc::new(distance_builder.finish()), + Arc::new(row_id_builder.finish()), + ], + )?; + metrics.baseline_metrics.record_output(batch.num_rows()); + DataFusionResult::Ok(batch) + }; + + let stream = stream::once(fut).finally(move || { + metrics_clone.index_metrics.flush_io(); + metrics_clone + .baseline_metrics + .elapsed_compute() + .add_duration(timer.elapsed()); + metrics_clone.baseline_metrics.done(); + }); + Ok(Box::pin(RecordBatchStreamAdapter::new( + schema, + stream.boxed(), + ))) + } + + fn supports_limit_pushdown(&self) -> bool { + false + } +} + fn adjust_probes(query: &mut Query, pruned_nprobes: usize) { query.minimum_nprobes = query.minimum_nprobes.max(pruned_nprobes); if let Some(maximum) = query.maximum_nprobes