Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 61 additions & 19 deletions roaring/src/bitmap/store/array_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod visitor;
use crate::bitmap::store::array_store::visitor::{CardinalityCounter, VecWriter};
use core::cmp::Ordering;
use core::cmp::Ordering::*;
use core::convert::identity;
use core::fmt::{Display, Formatter};
use core::mem::size_of;
use core::ops::{BitAnd, BitAndAssign, BitOr, BitXor, RangeInclusive, Sub, SubAssign};
Expand Down Expand Up @@ -126,20 +127,32 @@ impl ArrayStore {

#[inline]
pub fn insert(&mut self, index: u16) -> bool {
self.vec.binary_search(&index).map_err(|loc| self.vec.insert(loc, index)).is_err()
#[cfg(feature = "simd")]
let result = vector::quad_search(&self.vec, index);
#[cfg(not(feature = "simd"))]
let result = self.vec.binary_search(&index);
result.map_err(|loc| self.vec.insert(loc, index)).is_err()
}

pub fn insert_range(&mut self, range: RangeInclusive<u16>) -> u64 {
let start = *range.start();
let end = *range.end();

// Figure out the starting/ending position in the vec.
let pos_start = self.vec.binary_search(&start).unwrap_or_else(|x| x);
let pos_end = pos_start
+ match self.vec[pos_start..].binary_search(&end) {
Ok(x) => x + 1,
Err(x) => x,
};
#[cfg(feature = "simd")]
let pos_start = vector::quad_search(&self.vec, start).unwrap_or_else(identity);
#[cfg(not(feature = "simd"))]
let pos_start = self.vec.binary_search(&start).unwrap_or_else(identity);

#[cfg(feature = "simd")]
let pos_end_result = vector::quad_search(&self.vec[pos_start..], end);
#[cfg(not(feature = "simd"))]
let pos_end_result = self.vec[pos_start..].binary_search(&end);

let pos_end = match pos_end_result {
Ok(x) => x + pos_start + 1,
Err(x) => x + pos_start,
};

// Overwrite the range in the middle - there's no need to take
// into account any existing elements between start and end, as
Expand Down Expand Up @@ -175,20 +188,34 @@ impl ArrayStore {
}

pub fn remove(&mut self, index: u16) -> bool {
self.vec.binary_search(&index).map(|loc| self.vec.remove(loc)).is_ok()
#[cfg(feature = "simd")]
let result = vector::quad_search(&self.vec, index);
#[cfg(not(feature = "simd"))]
let result = self.vec.binary_search(&index);

result.map(|loc| self.vec.remove(loc)).is_ok()
}

pub fn remove_range(&mut self, range: RangeInclusive<u16>) -> u64 {
let start = *range.start();
let end = *range.end();

// Figure out the starting/ending position in the vec.
let pos_start = self.vec.binary_search(&start).unwrap_or_else(|x| x);
let pos_end = pos_start
+ match self.vec[pos_start..].binary_search(&end) {
Ok(x) => x + 1,
Err(x) => x,
};
#[cfg(feature = "simd")]
let pos_start = vector::quad_search(&self.vec, start).unwrap_or_else(identity);
#[cfg(not(feature = "simd"))]
let pos_start = self.vec.binary_search(&start).unwrap_or_else(identity);

#[cfg(feature = "simd")]
let pos_end_result = vector::quad_search(&self.vec[pos_start..], end);
#[cfg(not(feature = "simd"))]
let pos_end_result = self.vec[pos_start..].binary_search(&end);

let pos_end = match pos_end_result {
Ok(x) => x + pos_start + 1,
Err(x) => x + pos_start,
};

self.vec.drain(pos_start..pos_end);
(pos_end - pos_start) as u64
}
Expand All @@ -203,7 +230,10 @@ impl ArrayStore {
}

pub fn contains(&self, index: u16) -> bool {
self.vec.binary_search(&index).is_ok()
#[cfg(feature = "simd")]
return vector::quad_contains(&self.vec, index);
#[cfg(not(feature = "simd"))]
return self.vec.binary_search(&index).is_ok();
}

pub fn contains_range(&self, range: RangeInclusive<u16>) -> bool {
Expand All @@ -213,13 +243,20 @@ impl ArrayStore {
if self.vec.len() < range_count {
return false;
}
let start_i = match self.vec.binary_search(&start) {

#[cfg(feature = "simd")]
let result = vector::quad_search(&self.vec, start);
#[cfg(not(feature = "simd"))]
let result = self.vec.binary_search(&start);

let start_i = match result {
Ok(i) => i,
Err(_) => return false,
};

// If there are `range_count` items, last item in the next range_count should be the
// expected end value, because this vec is sorted and has no duplicates
// If there are `range_count` items, last item in the next range_count
// should be the expected end value, because this vec is sorted and
// has no duplicates
self.vec.get(start_i + range_count - 1) == Some(&end)
}

Expand Down Expand Up @@ -301,7 +338,12 @@ impl ArrayStore {
}

pub fn rank(&self, index: u16) -> u64 {
match self.vec.binary_search(&index) {
#[cfg(feature = "simd")]
let result = vector::quad_search(&self.vec, index);
#[cfg(not(feature = "simd"))]
let result = self.vec.binary_search(&index);

match result {
Ok(i) => i as u64 + 1,
Err(i) => i as u64,
}
Expand Down
115 changes: 115 additions & 0 deletions roaring/src/bitmap/store/array_store/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,3 +548,118 @@ pub fn swizzle_to_front(val: u16x8, bitmask: u8) -> u16x8 {
let swizzled: u8x16 = val_convert.swizzle_dyn(swizzle_idxs);
u16x8::from_ne_bytes(swizzled)
}

#[inline]
pub fn quad_contains(slice: &[u16], val: u16) -> bool {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering whether I should introduce this function, since it is used only once in the ArrayStore::contains method, or simply use the quad_search(...).is_ok() function that returns the position where the number is found or should be.

const GAP: usize = u16x8::LEN * 2;

let (chunks, remaining) = slice.as_chunks::<GAP>();

if chunks.is_empty() {
return match remaining.iter().copied().find(|v| *v >= val) {
Some(v) => v == val,
None => false,
};
}

let num_blocks = chunks.len();
let mut base = 0;
let mut n = num_blocks;
while n > 3 {
let quarter = n >> 2; // equivalent to n / 4

let k1 = chunks[base + quarter][GAP - 1];
let k2 = chunks[base + 2 * quarter][GAP - 1];
let k3 = chunks[base + 3 * quarter][GAP - 1];

let c1 = (k1 < val) as usize;
let c2 = (k2 < val) as usize;
let c3 = (k3 < val) as usize;

base += (c1 + c2 + c3) * quarter;
n -= 3 * quarter;
}

while n > 1 {
let half = n >> 1; // equivalent to n / 2
base = if chunks[base + half][GAP - 1] < val { base + half } else { base };
n -= half;
}

let lo = if chunks[base][GAP - 1] < val { base + 1 } else { base };

if lo < num_blocks {
let ndl = u16x8::splat(val);
// I would love to work with arrays here...
let v0 = u16x8::from_slice(&chunks[lo][..GAP / 2]);
let v1 = u16x8::from_slice(&chunks[lo][GAP / 2..]);
return (v0.simd_eq(ndl) | v1.simd_eq(ndl)).any();
}

match slice.iter().copied().skip(num_blocks * GAP).find(|v| *v >= val) {
Some(v) => v == val,
None => false,
}
}

#[inline]
pub fn quad_search(slice: &[u16], val: u16) -> Result<usize, usize> {
Copy link
Copy Markdown
Member

@Dr-Emann Dr-Emann May 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least on my M1 Pro Macbook, quad_search seems to always lose to just rust stdlib binary search.

Image

Copy link
Copy Markdown
Member Author

@Kerollmops Kerollmops May 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, have you tried benchmarking quad_contains? Because, I designed quad_search to be behave like the binary_search method and return the position where we found or must insert the item. I probably did it wrong or in an unoptimized way.

Thanks for the review anyway 😊

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quad Contains also seems to lose to binary search. If I remove the bounds checks, it's competitive at lower array sizes, but binary search wins again at larger array sizes.

Benchmark code at https://gist.github.com/Dr-Emann/558a3116f9cd2f984673ecaa73d76b61

Godbolt showing no panics in the unsafe bounds check removed implementation: https://rust.godbolt.org/z/369WxTqKo

image

All on an M1 Mac, interested if you get different results on x64.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a comment on the blog that seems relevant:

The remainder of the article uses the branchy std::binary_search as a baseline for comparison to branchless SIMD implementations, which is a poor representation of the performance difference between the scalar and SIMD algorithms.

The rust binary search implementation is really good.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lemire any interest in looking into if we're missing anything important here?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Dr-Emann I got my AI to do a Rust port,

https://github.com/lemire/rustquadsimd

It even did the experiments...

https://github.com/lemire/rustquadsimd#results

I told the AI to keep things really simple.

const GAP: usize = u16x8::LEN * 2;

let (chunks, remaining) = slice.as_chunks::<GAP>();

if chunks.is_empty() {
return match remaining.iter().copied().enumerate().find(|(_, v)| *v >= val) {
Some((i, v)) if v == val => Ok(i),
Some((i, _)) => Err(i),
None => Err(slice.len()),
};
}

let num_blocks = chunks.len();
let mut base = 0;
let mut n = num_blocks;
while n > 3 {
let quarter = n >> 2; // equivalent to n / 4

let k1 = chunks[base + quarter][GAP - 1];
let k2 = chunks[base + 2 * quarter][GAP - 1];
let k3 = chunks[base + 3 * quarter][GAP - 1];

let c1 = (k1 < val) as usize;
let c2 = (k2 < val) as usize;
let c3 = (k3 < val) as usize;

base += (c1 + c2 + c3) * quarter;
n -= 3 * quarter;
}

while n > 1 {
let half = n >> 1; // equivalent to n / 2
base = if chunks[base + half][GAP - 1] < val { base + half } else { base };
n -= half;
}

let lo = if chunks[base][GAP - 1] < val { base + 1 } else { base };

if lo < num_blocks {
let ndl = u16x8::splat(val);
// I would love to work with arrays here...
let v0 = u16x8::from_slice(&chunks[lo][..GAP / 2]);
let v1 = u16x8::from_slice(&chunks[lo][GAP / 2..]);
Comment on lines +648 to +649
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This from_slice method panics if the slice is too small; we need to check the generated assembler to see if it can avoid this redundant check.

let base_index = lo * GAP;
return match (v0.simd_ge(ndl).first_set(), v1.simd_ge(ndl).first_set()) {
(Some(i), _) if v0[i] == val => Ok(base_index + i),
(Some(i), _) => Err(base_index + i),
(_, Some(i)) if v1[i] == val => Ok(base_index + GAP / 2 + i),
(_, Some(i)) => Err(base_index + GAP / 2 + i),
(None, None) => Err(slice.len()),
};
Comment on lines +651 to +657
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if this is the best approach to take to compute the position where the needle is/should be located, or if I can do something else, less expensive than two simd_ge plus two first_set?

}

match slice.iter().copied().enumerate().skip(num_blocks * GAP).find(|(_, v)| *v >= val) {
Some((i, v)) if v == val => Ok(i),
Some((i, _)) => Err(i),
None => Err(slice.len()),
}
}
Loading