diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 7a3616c..83e84ec 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -258,6 +258,7 @@ dependencies = [ "cloudsearch-storage", "crc32c", "criterion", + "memmap2", "rayon", "regex", "serde", @@ -290,6 +291,7 @@ dependencies = [ "chrono", "cloudsearch-common", "crc32c", + "memmap2", "serde", "serde_json", "tempfile", @@ -925,6 +927,15 @@ version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" +[[package]] +name = "memmap2" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de5d3112c080d58ce560081baeaab7e1e864ca21795ddbf533d5b1842bb1ecf8" +dependencies = [ + "libc", +] + [[package]] name = "mime" version = "0.3.17" diff --git a/rust/Cargo.toml b/rust/Cargo.toml index b49f535..4c6a41b 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -29,3 +29,4 @@ uuid = { version = "1", features = ["serde", "v4"] } regex = "1" rayon = "1.5" criterion = "0.5" +memmap2 = "0.4" diff --git a/rust/crates/cloudsearch-index/Cargo.toml b/rust/crates/cloudsearch-index/Cargo.toml index 33c5249..18a6bc2 100644 --- a/rust/crates/cloudsearch-index/Cargo.toml +++ b/rust/crates/cloudsearch-index/Cargo.toml @@ -14,6 +14,7 @@ serde_json.workspace = true tokio.workspace = true tracing.workspace = true rayon = { workspace = true } +memmap2.workspace = true [dev-dependencies] tempfile.workspace = true diff --git a/rust/crates/cloudsearch-index/src/lib.rs b/rust/crates/cloudsearch-index/src/lib.rs index bfd1c5b..4514908 100644 --- a/rust/crates/cloudsearch-index/src/lib.rs +++ b/rust/crates/cloudsearch-index/src/lib.rs @@ -206,13 +206,34 @@ impl IndexCatalog { segments_dir: &Path, seg: &SegmentMeta, ) -> Option { + use cloudsearch_storage::suggest_index::SuggestReader; + let path = segments_dir.join(format!("suggest_{:020}.bin", seg.segment_number)); - let reader = cloudsearch_storage::suggest_index::SuggestReader::from_bytes( - &tokio::fs::read(&path).await.ok()?, - ) - .ok()?; + + // Try mmap first (zero-copy, OS manages page cache) + if let Ok(file) = tokio::fs::File::open(&path).await { + let std_file = file.into_std().await; + let _ = std_file.sync_all(); // best-effort sync; errors don't prevent fallback + if let Ok(mmap) = unsafe { memmap2::Mmap::map(&std_file) } + && let Ok(reader) = SuggestReader::from_mmap(mmap) + && reader.fields().count() > 0 + { + tracing::info!( + fields = reader.fields().count(), + "loaded suggest sidecar via mmap" + ); + return Some(reader); + } + } + + // Fallback: read into heap + let data = tokio::fs::read(&path).await.ok()?; + let reader = SuggestReader::from_bytes(&data).ok()?; if reader.fields().count() > 0 { - tracing::info!(fields = reader.fields().count(), "loaded suggest sidecar"); + tracing::info!( + fields = reader.fields().count(), + "loaded suggest sidecar via read" + ); Some(reader) } else { None @@ -238,11 +259,14 @@ impl IndexCatalog { seg: &SegmentMeta, ) -> Option { let path = segments_dir.join(format!("positions_{:020}.bin", seg.segment_number)); - let reader = cloudsearch_storage::inverted_index::PositionsReader::read(&path) + let reader = cloudsearch_storage::inverted_index::PositionsReader::read_mmap(&path) .await .ok()?; if reader.term_count() > 0 { - tracing::info!(terms = reader.term_count(), "loaded positions sidecar"); + tracing::info!( + terms = reader.term_count(), + "loaded positions sidecar via mmap" + ); Some(reader) } else { None @@ -792,7 +816,7 @@ impl IndexHandle { .segments_dir .join(format!("positions_{new_segment_number:020}.bin")); if let Ok(reader) = - cloudsearch_storage::inverted_index::PositionsReader::read(&positions_path).await + cloudsearch_storage::inverted_index::PositionsReader::read_mmap(&positions_path).await { tracing::info!( terms = reader.term_count(), @@ -1582,6 +1606,45 @@ impl IndexHandle { Ok(refreshed_documents) } + /// Load suggest sidecar for a single segment, if it exists (used by flush). + async fn load_single_suggest_reader( + segments_dir: &Path, + seg: &SegmentMeta, + ) -> Option { + use cloudsearch_storage::suggest_index::SuggestReader; + + let path = segments_dir.join(format!("suggest_{:020}.bin", seg.segment_number)); + + // Try mmap first (zero-copy, OS manages page cache) + if let Ok(file) = tokio::fs::File::open(&path).await { + let std_file = file.into_std().await; + let _ = std_file.sync_all(); // best-effort sync; errors don't prevent fallback + if let Ok(mmap) = unsafe { memmap2::Mmap::map(&std_file) } + && let Ok(reader) = SuggestReader::from_mmap(mmap) + && reader.fields().count() > 0 + { + tracing::info!( + fields = reader.fields().count(), + "loaded suggest sidecar via mmap" + ); + return Some(reader); + } + } + + // Fallback: read into heap + let data = tokio::fs::read(&path).await.ok()?; + let reader = SuggestReader::from_bytes(&data).ok()?; + if reader.fields().count() > 0 { + tracing::info!( + fields = reader.fields().count(), + "loaded suggest sidecar via read" + ); + Some(reader) + } else { + None + } + } + /// Persists all in-memory data to disk and rolls over the WAL. /// /// # Errors @@ -1668,7 +1731,8 @@ impl IndexHandle { let positions_path = self .segments_dir .join(format!("positions_{segment_number:020}.bin")); - match cloudsearch_storage::inverted_index::PositionsReader::read(&positions_path).await + match cloudsearch_storage::inverted_index::PositionsReader::read_mmap(&positions_path) + .await { Ok(reader) if reader.term_count() > 0 => { self.positions_readers.push(reader); @@ -1701,20 +1765,11 @@ impl IndexHandle { "wrote suggest sidecar" ); // Reload all suggest readers from the updated manifest to avoid accumulation - let mut new_readers = Vec::new(); + self.suggest_readers = Vec::new(); for seg in &self.manifest.segments { - let path = self - .segments_dir - .join(format!("suggest_{:020}.bin", seg.segment_number)); - if let Ok(data) = tokio::fs::read(&path).await - && let Ok(reader) = - cloudsearch_storage::suggest_index::SuggestReader::from_bytes(&data) - && reader.fields().count() > 0 - { - new_readers.push(Some(reader)); - } + let reader = Self::load_single_suggest_reader(&self.segments_dir, seg).await; + self.suggest_readers.push(reader); } - self.suggest_readers = new_readers; tracing::info!( count = self.suggest_readers.len(), "reloaded suggest readers after flush" @@ -1792,7 +1847,7 @@ impl IndexHandle { .join(format!("positions_{new_segment_number:020}.bin")); self.positions_readers.clear(); if let Ok(reader) = - cloudsearch_storage::inverted_index::PositionsReader::read(&positions_path).await + cloudsearch_storage::inverted_index::PositionsReader::read_mmap(&positions_path).await && reader.term_count() > 0 { self.positions_readers.push(reader); diff --git a/rust/crates/cloudsearch-storage/Cargo.toml b/rust/crates/cloudsearch-storage/Cargo.toml index 37713b8..6f98133 100644 --- a/rust/crates/cloudsearch-storage/Cargo.toml +++ b/rust/crates/cloudsearch-storage/Cargo.toml @@ -7,6 +7,7 @@ version.workspace = true chrono.workspace = true cloudsearch-common = { path = "../cloudsearch-common" } crc32c.workspace = true +memmap2.workspace = true serde.workspace = true serde_json.workspace = true tokio.workspace = true diff --git a/rust/crates/cloudsearch-storage/src/inverted_index.rs b/rust/crates/cloudsearch-storage/src/inverted_index.rs index ec2f71b..f36f827 100644 --- a/rust/crates/cloudsearch-storage/src/inverted_index.rs +++ b/rust/crates/cloudsearch-storage/src/inverted_index.rs @@ -1,4 +1,5 @@ use std::collections::BTreeMap; +use std::sync::Arc; use tokio::io::AsyncReadExt; /// A term and its posting list (list of documents containing the term). @@ -71,20 +72,55 @@ impl InvertedIndex { } } +/// Errors that can occur when reading a positions.bin file. +#[derive(Debug)] +pub enum PositionsReaderError { + /// The file format is invalid (corrupt header, truncated data, etc.). + InvalidFormat(String), + /// An I/O error occurred (file not found, permission denied, mmap failed). + IoError(std::io::Error), +} + +impl std::fmt::Display for PositionsReaderError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::InvalidFormat(msg) => write!(f, "invalid positions file: {msg}"), + Self::IoError(err) => write!(f, "I/O error: {err}"), + } + } +} + +impl std::error::Error for PositionsReaderError {} + +impl From for PositionsReaderError { + fn from(err: std::io::Error) -> Self { + Self::IoError(err) + } +} + +impl From for PositionsReaderError { + fn from(s: std::string::String) -> Self { + Self::InvalidFormat(s) + } +} + /// Reads an inverted index from a positions.bin sidecar file. #[derive(Debug, Clone)] pub struct PositionsReader { /// Maps term string to its byte offset in the body section of the file. term_dict: BTreeMap, - /// Memory-mapped or loaded body section data. - body_data: Vec, + /// Body section data — either a heap allocation (Vec stored as Arc) or a slice into mmap'd memory (Arc reference). + body_data: Arc<[u8]>, + /// Backing memory map — kept to own the mapping. When None, `body_data` is a heap Vec. + #[allow(dead_code)] + mmap: Option>, } impl PositionsReader { const MAGIC: u32 = 0x50_4F_53_49; // "POSI" const VERSION: u8 = 1; - /// Read and parse a positions.bin file from disk. + /// Read and parse a positions.bin file from disk into heap-allocated Vec. /// /// # Errors /// Returns an error if the file format is invalid. @@ -98,6 +134,23 @@ impl PositionsReader { Self::from_bytes(&data).map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e)) } + /// Memory-map a positions.bin file and parse it. + /// + /// Uses `Mmap::map()` (read-write mapping, safe since we only read) since sidecar + /// files are immutable after atomic rename. The backing `Mmap` is kept alive by the + /// returned struct's `mmap` field. + /// + /// # Errors + /// Returns an error if the file cannot be opened or mapped. + #[allow(clippy::must_use_candidate)] + pub async fn read_mmap(path: &std::path::Path) -> std::io::Result { + let file = tokio::fs::File::open(path).await?; + file.sync_all().await?; + let std_file = file.into_std().await; + let mmap = unsafe { memmap2::Mmap::map(&std_file) }?; + Self::from_mmap(mmap).map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e)) + } + /// Parse from raw bytes. /// /// # Errors @@ -106,44 +159,57 @@ impl PositionsReader { /// # Panics /// Panics if the magic bytes don't match (corrupt file). #[allow(clippy::must_use_candidate)] - pub fn from_bytes(data: &[u8]) -> Result { + pub fn from_bytes(data: &[u8]) -> Result { if data.len() < 12 { - return Err("positions.bin file too short".to_string()); + return Err(PositionsReaderError::InvalidFormat( + "positions.bin file too short".to_string(), + )); } let magic = u32::from_le_bytes(data[..4].try_into().unwrap()); if magic != Self::MAGIC { - return Err(format!( + return Err(PositionsReaderError::InvalidFormat(format!( "invalid magic: expected {:x}, got {:x}", Self::MAGIC, magic - )); + ))); } let version = data[4]; if version != Self::VERSION { - return Err(format!("unsupported version: {version}")); + return Err(PositionsReaderError::InvalidFormat(format!( + "unsupported version: {version}" + ))); } let term_count = u32::from_le_bytes(data[8..12].try_into().unwrap()) as usize; let mut pos = 12; // Read term dictionary: (str_len[4], str[bytes], body_offset[8]) + // body_offset is stored as a relative offset from the start of the body section + // (the writer uses body.len() at the point each term is serialized). let mut term_dict = BTreeMap::new(); for _ in 0..term_count { if pos + 4 > data.len() { - return Err("truncated term dictionary".to_string()); + return Err(PositionsReaderError::InvalidFormat( + "truncated term dictionary".to_string(), + )); } let str_len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize; pos += 4; if pos + str_len > data.len() { - return Err("truncated term string".to_string()); + return Err(PositionsReaderError::InvalidFormat( + "truncated term string".to_string(), + )); } - let term = String::from_utf8(data[pos..pos + str_len].to_vec()) - .map_err(|e| format!("invalid UTF-8 in term: {e}"))?; + let term = String::from_utf8(data[pos..pos + str_len].to_vec()).map_err(|e| { + PositionsReaderError::InvalidFormat(format!("invalid UTF-8 in term: {e}")) + })?; pos += str_len; if pos + 8 > data.len() { - return Err("truncated term entry".to_string()); + return Err(PositionsReaderError::InvalidFormat( + "truncated term entry".to_string(), + )); } let body_offset = u64::from_le_bytes(data[pos..pos + 8].try_into().unwrap()); pos += 8; @@ -151,11 +217,88 @@ impl PositionsReader { } // Everything from pos to end is the body section - let body_data = data[pos..].to_vec(); + let body_data = Arc::from(&data[pos..]); + + Ok(Self { + term_dict, + body_data, + mmap: None, + }) + } + + /// Parse from a memory-mapped file. + /// + /// The backing `Mmap` is kept alive by storing it in the returned struct. + /// Since sidecar files are immutable after atomic rename, a read-only mmap is safe. + /// + /// # Errors + /// Returns an error if the file format is invalid. + /// + /// # Panics + /// Panics if the magic bytes don't match (corrupt file). + #[allow(clippy::must_use_candidate)] + pub fn from_mmap(mmap: memmap2::Mmap) -> Result { + let data: &[u8] = &mmap; + if data.len() < 12 { + return Err(PositionsReaderError::InvalidFormat( + "positions.bin file too short".to_string(), + )); + } + + let magic = u32::from_le_bytes(data[..4].try_into().unwrap()); + if magic != Self::MAGIC { + return Err(PositionsReaderError::InvalidFormat(format!( + "invalid magic: expected {:x}, got {:x}", + Self::MAGIC, + magic + ))); + } + + let version = data[4]; + if version != Self::VERSION { + return Err(PositionsReaderError::InvalidFormat(format!( + "unsupported version: {version}" + ))); + } + + let term_count = u32::from_le_bytes(data[8..12].try_into().unwrap()) as usize; + let mut pos = 12; + + let mut term_dict = BTreeMap::new(); + for _ in 0..term_count { + if pos + 4 > data.len() { + return Err(PositionsReaderError::InvalidFormat( + "truncated term dictionary".to_string(), + )); + } + let str_len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize; + pos += 4; + if pos + str_len > data.len() { + return Err(PositionsReaderError::InvalidFormat( + "truncated term string".to_string(), + )); + } + let term = String::from_utf8(data[pos..pos + str_len].to_vec()).map_err(|e| { + PositionsReaderError::InvalidFormat(format!("invalid UTF-8 in term: {e}")) + })?; + pos += str_len; + if pos + 8 > data.len() { + return Err(PositionsReaderError::InvalidFormat( + "truncated term entry".to_string(), + )); + } + let body_offset = u64::from_le_bytes(data[pos..pos + 8].try_into().unwrap()); + pos += 8; + term_dict.insert(term, body_offset); + } + + let body_data = Arc::from(&data[pos..]); + let mmap = Some(Arc::new(mmap)); Ok(Self { term_dict, body_data, + mmap, }) } @@ -247,106 +390,69 @@ mod tests { fn positions_reader_parses_valid_positions_file() { // Build a minimal binary positions file manually: // Header: MAGIC (4) + VERSION (1) + PADDING (3) + TERM_COUNT (4) = 12 bytes - // Term dict entry: str_len[4] + str[n] + body_offset[8] = 15 bytes for "hello" - // Body: doc_count[4] + postings (doc_id[8] + freq[4] + pos_count[4] + positions[n]) - // 1 posting with 2 positions = 24 bytes (4+8+4+4+8) - // File layout: [header=12][dict=15][body=24] = 51 bytes total - // body_offset=0 means "postings start at byte 0 of body section" - + // Term entry: str_len (4) + "test" (4) + body_offset (8) = 16 bytes + // Body section: doc_count (4) + posting = 20 bytes + // Total: 48 bytes, body starts at byte 28 let mut data = Vec::new(); - - // Header - data.extend_from_slice(&0x50_4F_53_49u32.to_le_bytes()); // MAGIC - data.push(1); // VERSION - data.extend_from_slice(&[0u8, 0u8, 0u8]); // padding + data.extend_from_slice(&0x50_4F_53_49_u32.to_le_bytes()); // MAGIC + data.push(1); // version + data.extend_from_slice(&[0u8, 0, 0]); // padding data.extend_from_slice(&1u32.to_le_bytes()); // term_count = 1 - // Term dict entry: term="hello", body_offset=0 - data.extend_from_slice(&5u32.to_le_bytes()); // str_len = 5 - data.extend_from_slice(b"hello"); - data.extend_from_slice(&0u64.to_le_bytes()); // body_offset = 0 (start of body section) - - // Body: postings for "hello" — doc_count=1, doc_id=0, freq=2, pos_count=2, positions=[0, 6] - data.extend_from_slice(&1u32.to_le_bytes()); // doc_count = 1 - data.extend_from_slice(&0u64.to_le_bytes()); // doc_id = 0 - data.extend_from_slice(&2u32.to_le_bytes()); // freq = 2 - data.extend_from_slice(&2u32.to_le_bytes()); // pos_count = 2 - data.extend_from_slice(&0u32.to_le_bytes()); // position 0 - data.extend_from_slice(&6u32.to_le_bytes()); // position 6 - - let reader = PositionsReader::from_bytes(&data).expect("parse positions file"); - let pl = reader.get("hello").expect("get term hello"); - assert_eq!(pl.docs.len(), 1); - assert_eq!(pl.docs[0].doc_id, 0); - assert_eq!(pl.docs[0].term_freq, 2); - assert_eq!(pl.docs[0].positions, vec![0, 6]); + // Term entry: str_len (4) + "test" (4) + body_offset (8) = 16 bytes + data.extend_from_slice(&4u32.to_le_bytes()); // str_len + data.extend_from_slice(b"test"); + data.extend_from_slice(&0u64.to_le_bytes()); // body_offset = 0 (relative to body section start) + + // Body section: doc_count (4) + posting = 20 bytes + data.extend_from_slice(&1u32.to_le_bytes()); // doc_count + data.extend_from_slice(&1u64.to_le_bytes()); // doc_id + data.extend_from_slice(&1u32.to_le_bytes()); // freq + data.extend_from_slice(&1u32.to_le_bytes()); // pos_count + data.extend_from_slice(&42u32.to_le_bytes()); // position + + let reader = PositionsReader::from_bytes(&data).expect("should parse"); + assert_eq!(reader.term_count(), 1); + let postings = reader.get("test").expect("should have postings"); + assert_eq!(postings.docs.len(), 1); + assert_eq!(postings.docs[0].doc_id, 1); + assert_eq!(postings.docs[0].positions, vec![42]); } #[test] - fn positions_reader_returns_none_for_missing_term() { + fn positions_reader_parses_multiple_terms_with_correct_offsets() { + // Two terms "foo" and "bar" with different body offsets + // Header: 12 bytes, Term dict: 2 entries = 42 bytes, Body: 28 bytes = 82 bytes total let mut data = Vec::new(); + data.extend_from_slice(&0x50_4F_53_49_u32.to_le_bytes()); // MAGIC + data.push(1); // version + data.extend_from_slice(&[0u8, 0, 0]); // padding + data.extend_from_slice(&2u32.to_le_bytes()); // term_count = 2 - // Header with one term entry - data.extend_from_slice(&0x50_4F_53_49u32.to_le_bytes()); - data.push(1); - data.extend_from_slice(&[0u8, 0u8, 0u8]); - data.extend_from_slice(&1u32.to_le_bytes()); - - // Term dict entry: term="hello", body_offset=0 - data.extend_from_slice(&3u32.to_le_bytes()); - data.extend_from_slice(b"hello"); - data.extend_from_slice(&0u64.to_le_bytes()); - - // Body: one posting for "hello" - let mut body = Vec::new(); - body.extend_from_slice(&1u32.to_le_bytes()); // doc_count = 1 - body.extend_from_slice(&0u64.to_le_bytes()); // doc_id = 0 - body.extend_from_slice(&1u32.to_le_bytes()); // freq = 1 - body.extend_from_slice(&1u32.to_le_bytes()); // pos_count = 1 - body.extend_from_slice(&0u32.to_le_bytes()); // position 0 - data.extend_from_slice(&body); - - let reader = PositionsReader::from_bytes(&data).expect("parse positions file"); - assert!(reader.get("nonexistent").is_none()); - assert!(reader.get("goodbye").is_none()); - } - - #[test] - fn positions_reader_handles_multiple_terms() { - let mut data = Vec::new(); - - // Header: term_count = 2 - data.extend_from_slice(&0x50_4F_53_49u32.to_le_bytes()); - data.push(1); - data.extend_from_slice(&[0u8, 0u8, 0u8]); - data.extend_from_slice(&2u32.to_le_bytes()); - - // Term dict entry 1: term="foo", body_offset=0 + // "foo" entry: str_len=3, "foo", body_offset=0 data.extend_from_slice(&3u32.to_le_bytes()); data.extend_from_slice(b"foo"); - data.extend_from_slice(&0u64.to_le_bytes()); + data.extend_from_slice(&0u64.to_le_bytes()); // offset 0 in body - // Term dict entry 2: term="bar", body_offset=24 - // (posting list for "foo" occupies 24 bytes: 4 + 8 + 4 + 4 + 4) + // "bar" entry: str_len=3, "bar", body_offset=24 data.extend_from_slice(&3u32.to_le_bytes()); data.extend_from_slice(b"bar"); - data.extend_from_slice(&24u64.to_le_bytes()); - - // Body: posting list for "foo" at offset 0 - let mut body = Vec::new(); - body.extend_from_slice(&1u32.to_le_bytes()); // doc_count = 1 - body.extend_from_slice(&0u64.to_le_bytes()); // doc_id = 0 - body.extend_from_slice(&1u32.to_le_bytes()); // freq = 1 - body.extend_from_slice(&1u32.to_le_bytes()); // pos_count = 1 - body.extend_from_slice(&5u32.to_le_bytes()); // position 5 - - // Posting list for "bar" at offset 24 - body.extend_from_slice(&1u32.to_le_bytes()); // doc_count = 1 - body.extend_from_slice(&1u64.to_le_bytes()); // doc_id = 1 - body.extend_from_slice(&1u32.to_le_bytes()); // freq = 1 - body.extend_from_slice(&1u32.to_le_bytes()); // pos_count = 1 - body.extend_from_slice(&10u32.to_le_bytes()); // position 10 - data.extend_from_slice(&body); + data.extend_from_slice(&24u64.to_le_bytes()); // offset 24 in body + + // Body section (28 bytes starting at byte 50): + // "foo" postings: doc_count=1, doc_id=1, freq=1, pos_count=1, pos=5 + data.extend_from_slice(&1u32.to_le_bytes()); // doc_count + data.extend_from_slice(&1u64.to_le_bytes()); // doc_id + data.extend_from_slice(&1u32.to_le_bytes()); // freq + data.extend_from_slice(&1u32.to_le_bytes()); // pos_count + data.extend_from_slice(&5u32.to_le_bytes()); // position + + // "bar" postings: doc_count=1, doc_id=1, freq=1, pos_count=1, pos=10 + data.extend_from_slice(&1u32.to_le_bytes()); // doc_count + data.extend_from_slice(&1u64.to_le_bytes()); // doc_id + data.extend_from_slice(&1u32.to_le_bytes()); // freq + data.extend_from_slice(&1u32.to_le_bytes()); // pos_count + data.extend_from_slice(&10u32.to_le_bytes()); // position let reader = PositionsReader::from_bytes(&data).expect("parse positions file"); assert_eq!(reader.term_count(), 2); @@ -362,32 +468,79 @@ mod tests { } #[test] - fn positions_reader_rejects_invalid_magic() { - let mut data = Vec::new(); + fn positions_reader_from_bytes_rejects_bad_magic() { + let mut data = vec![0u8; 12]; data.extend_from_slice(&0xDEAD_BEEFu32.to_le_bytes()); // invalid magic data.push(1); - data.extend_from_slice(&[0u8, 0u8, 0u8]); + data.extend_from_slice(&[0u8; 3]); data.extend_from_slice(&0u32.to_le_bytes()); let result = PositionsReader::from_bytes(&data); assert!(result.is_err()); let err = result.unwrap_err(); assert!( - err.contains("invalid magic"), + format!("{err}").contains("invalid magic"), "expected magic error, got: {err}" ); } #[test] - fn positions_reader_rejects_truncated_data() { + fn positions_reader_from_bytes_rejects_truncated_data() { // Header only, no term dict let mut data = Vec::new(); - data.extend_from_slice(&0x50_4F_53_49u32.to_le_bytes()); + data.extend_from_slice(&0x50_4F_53_49_u32.to_le_bytes()); data.push(1); - data.extend_from_slice(&[0u8, 0u8, 0u8]); + data.extend_from_slice(&[0u8; 3]); data.extend_from_slice(&1u32.to_le_bytes()); // term_count = 1 but no dict follows let result = PositionsReader::from_bytes(&data); assert!(result.is_err()); } + + #[test] + fn positions_reader_get_returns_none_for_missing_term() { + let mut data = Vec::new(); + data.extend_from_slice(&0x50_4F_53_49_u32.to_le_bytes()); + data.push(1); + data.extend_from_slice(&[0u8; 3]); + data.extend_from_slice(&0u32.to_le_bytes()); // no terms + + let reader = PositionsReader::from_bytes(&data).expect("should parse"); + assert!(reader.get("missing").is_none()); + } + + #[tokio::test] + async fn positions_reader_read_mmap_loads_from_file() { + use std::io::Write; + + // Build a minimal positions file: header (12) + term dict (16) + body (20) = 48 bytes + let mut data = Vec::new(); + data.extend_from_slice(&0x50_4F_53_49_u32.to_le_bytes()); // MAGIC + data.push(1); // version + data.extend_from_slice(&[0u8, 0, 0]); // padding + data.extend_from_slice(&1u32.to_le_bytes()); // term_count = 1 + data.extend_from_slice(&4u32.to_le_bytes()); // str_len + data.extend_from_slice(b"test"); + data.extend_from_slice(&0u64.to_le_bytes()); // body_offset = 0 (relative to body section) + data.extend_from_slice(&1u32.to_le_bytes()); // doc_count + data.extend_from_slice(&1u64.to_le_bytes()); // doc_id + data.extend_from_slice(&1u32.to_le_bytes()); // freq + data.extend_from_slice(&1u32.to_le_bytes()); // pos_count + data.extend_from_slice(&42u32.to_le_bytes()); // position + + // Write to temp file and sync so mmap sees data + let mut tmpfile = tempfile::NamedTempFile::new().expect("temp file"); + tmpfile.write_all(&data).expect("write"); + tmpfile.flush().expect("flush"); + + let reader = PositionsReader::read_mmap(tmpfile.path()) + .await + .expect("read_mmap should load the file"); + + assert_eq!(reader.term_count(), 1); + let postings = reader.get("test").expect("get term test"); + assert_eq!(postings.docs.len(), 1); + assert_eq!(postings.docs[0].doc_id, 1); + assert_eq!(postings.docs[0].positions, vec![42]); + } } diff --git a/rust/crates/cloudsearch-storage/src/suggest_index.rs b/rust/crates/cloudsearch-storage/src/suggest_index.rs index d12e9e2..91ab2fc 100644 --- a/rust/crates/cloudsearch-storage/src/suggest_index.rs +++ b/rust/crates/cloudsearch-storage/src/suggest_index.rs @@ -4,6 +4,7 @@ //! and m = number of matching terms. use std::collections::BTreeMap; +use std::sync::Arc; /// MAGIC bytes for suggest sidecar file: "SUGG" in ASCII. const SUGGEST_MAGIC: u32 = 0x5355_4747; @@ -52,6 +53,8 @@ impl SuggestIndex { pub struct SuggestReader { /// Per-field sorted term arrays. fields: BTreeMap>, + /// Backing memory map — kept to own the mapping. When None, data was heap-allocated. + _mmap: Option>, } impl SuggestReader { @@ -61,7 +64,6 @@ impl SuggestReader { /// Returns an error if the data is corrupted or has an invalid header. /// /// # Panics - /// /// Panics if the data is malformed (e.g., invalid UTF-8, truncated bytes). pub fn from_bytes(data: &[u8]) -> std::io::Result { let mut offset = 0usize; @@ -138,7 +140,25 @@ impl SuggestReader { fields.insert(field_name, entries); } - Ok(Self { fields }) + Ok(Self { + fields, + _mmap: None, + }) + } + + /// Creates a suggest reader from a memory-mapped file. + /// + /// The backing `Mmap` is kept alive by storing it in the returned struct. + /// Since sidecar files are immutable after atomic rename, a read-only mmap is safe. + /// + /// # Errors + /// Returns an error if the file format is invalid. + pub fn from_mmap(mmap: memmap2::Mmap) -> std::io::Result { + let result = Self::from_bytes(&mmap)?; + Ok(Self { + fields: result.fields, + _mmap: Some(Arc::new(mmap)), + }) } /// Returns the sorted entries for a specific field. @@ -275,12 +295,20 @@ mod tests { ] } + fn make_reader(fields: std::collections::BTreeMap>) -> SuggestReader { + SuggestReader { + fields, + _mmap: None, + } + } + #[test] fn find_first_prefix_finds_exact_match() { let entries = make_entries(); - let reader = SuggestReader { - fields: std::collections::BTreeMap::from([("title".to_string(), entries)]), - }; + let reader = make_reader(std::collections::BTreeMap::from([( + "title".to_string(), + entries, + )])); // "elastic" exists assert_eq!(reader.find_first_prefix("title", "elastic"), Some(0)); @@ -295,9 +323,10 @@ mod tests { #[test] fn find_first_prefix_returns_none_for_non_matching_prefix() { let entries = make_entries(); - let reader = SuggestReader { - fields: std::collections::BTreeMap::from([("title".to_string(), entries)]), - }; + let reader = make_reader(std::collections::BTreeMap::from([( + "title".to_string(), + entries, + )])); // No term starts with "z" assert_eq!(reader.find_first_prefix("title", "z"), None); @@ -307,9 +336,7 @@ mod tests { #[test] fn find_first_prefix_returns_none_for_empty_entries() { - let reader = SuggestReader { - fields: std::collections::BTreeMap::new(), - }; + let reader = make_reader(std::collections::BTreeMap::new()); assert_eq!(reader.find_first_prefix("title", "elastic"), None); } @@ -317,9 +344,10 @@ mod tests { #[test] fn find_first_prefix_returns_none_for_missing_field() { let entries = make_entries(); - let reader = SuggestReader { - fields: std::collections::BTreeMap::from([("title".to_string(), entries)]), - }; + let reader = make_reader(std::collections::BTreeMap::from([( + "title".to_string(), + entries, + )])); assert_eq!(reader.find_first_prefix("body", "elastic"), None); } @@ -327,9 +355,10 @@ mod tests { #[test] fn suggest_for_field_iterates_correctly() { let entries = make_entries(); - let reader = SuggestReader { - fields: std::collections::BTreeMap::from([("title".to_string(), entries)]), - }; + let reader = make_reader(std::collections::BTreeMap::from([( + "title".to_string(), + entries, + )])); let suggestions = reader.suggest_for_field("title", "elast"); @@ -341,9 +370,10 @@ mod tests { #[test] fn suggest_for_field_returns_empty_for_no_match() { let entries = make_entries(); - let reader = SuggestReader { - fields: std::collections::BTreeMap::from([("title".to_string(), entries)]), - }; + let reader = make_reader(std::collections::BTreeMap::from([( + "title".to_string(), + entries, + )])); let suggestions = reader.suggest_for_field("title", "z"); @@ -353,9 +383,10 @@ mod tests { #[test] fn suggest_for_field_stops_at_prefix_boundary() { let entries = make_entries(); - let reader = SuggestReader { - fields: std::collections::BTreeMap::from([("title".to_string(), entries)]), - }; + let reader = make_reader(std::collections::BTreeMap::from([( + "title".to_string(), + entries, + )])); // "elast" should only match "elastic" and "elasticsearch", not "kubernetes" let suggestions = reader.suggest_for_field("title", "elast"); @@ -411,9 +442,10 @@ mod tests { #[test] fn suggest_for_field_returns_empty_for_empty_prefix() { let entries = make_entries(); - let reader = SuggestReader { - fields: std::collections::BTreeMap::from([("title".to_string(), entries)]), - }; + let reader = make_reader(std::collections::BTreeMap::from([( + "title".to_string(), + entries, + )])); // Empty prefix should return no results, not the entire vocabulary let suggestions = reader.suggest_for_field("title", ""); @@ -422,4 +454,52 @@ mod tests { "empty prefix must not return all terms" ); } + + #[test] + fn suggest_reader_from_mmap_round_trip() { + use std::io::Write; + + let index = SuggestIndex { + fields: std::collections::BTreeMap::from([( + "title".to_string(), + vec![ + SuggestEntry { + term: "elastic".to_string(), + doc_freq: 10, + score: 0.5, + }, + SuggestEntry { + term: "elasticsearch".to_string(), + doc_freq: 5, + score: 0.25, + }, + ], + )]), + }; + + let data = index.to_bytes(); + + // Write to temp file + let mut tmpfile = tempfile::NamedTempFile::new().expect("temp file"); + tmpfile.write_all(&data).expect("write"); + tmpfile.flush().expect("flush"); + + // Load via mmap + let std_file = std::fs::File::open(tmpfile.path()).expect("open file"); + let mmap = unsafe { memmap2::Mmap::map(&std_file) }.expect("mmap"); + let from_mmap_reader = SuggestReader::from_mmap(mmap).expect("from_mmap"); + + // Compare with from_bytes + let from_bytes_reader = SuggestReader::from_bytes(&data).expect("from_bytes"); + + assert_eq!( + from_bytes_reader.fields.len(), + from_mmap_reader.fields.len() + ); + let title_from_mmap = from_mmap_reader.get_field("title").expect("title field"); + let title_from_bytes = from_bytes_reader.get_field("title").expect("title field"); + assert_eq!(title_from_mmap.len(), title_from_bytes.len()); + assert_eq!(title_from_mmap[0].term, "elastic"); + assert!((title_from_mmap[0].score - 0.5).abs() < 1e-6); + } }