Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ uuid = { version = "1", features = ["serde", "v4"] }
regex = "1"
rayon = "1.5"
criterion = "0.5"
memmap2 = "0.4"
1 change: 1 addition & 0 deletions rust/crates/cloudsearch-index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ serde_json.workspace = true
tokio.workspace = true
tracing.workspace = true
rayon = { workspace = true }
memmap2.workspace = true

[dev-dependencies]
tempfile.workspace = true
Expand Down
99 changes: 77 additions & 22 deletions rust/crates/cloudsearch-index/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,13 +206,34 @@ impl IndexCatalog {
segments_dir: &Path,
seg: &SegmentMeta,
) -> Option<cloudsearch_storage::suggest_index::SuggestReader> {
use cloudsearch_storage::suggest_index::SuggestReader;

let path = segments_dir.join(format!("suggest_{:020}.bin", seg.segment_number));
let reader = cloudsearch_storage::suggest_index::SuggestReader::from_bytes(
&tokio::fs::read(&path).await.ok()?,
)
.ok()?;

// Try mmap first (zero-copy, OS manages page cache)
if let Ok(file) = tokio::fs::File::open(&path).await {
let std_file = file.into_std().await;
let _ = std_file.sync_all(); // best-effort sync; errors don't prevent fallback
if let Ok(mmap) = unsafe { memmap2::Mmap::map(&std_file) }
Comment thread
coderabbitai[bot] marked this conversation as resolved.
&& let Ok(reader) = SuggestReader::from_mmap(mmap)
&& reader.fields().count() > 0
{
tracing::info!(
fields = reader.fields().count(),
"loaded suggest sidecar via mmap"
);
return Some(reader);
}
}

// Fallback: read into heap
let data = tokio::fs::read(&path).await.ok()?;
let reader = SuggestReader::from_bytes(&data).ok()?;
if reader.fields().count() > 0 {
tracing::info!(fields = reader.fields().count(), "loaded suggest sidecar");
tracing::info!(
fields = reader.fields().count(),
"loaded suggest sidecar via read"
);
Some(reader)
} else {
None
Expand All @@ -238,11 +259,14 @@ impl IndexCatalog {
seg: &SegmentMeta,
) -> Option<cloudsearch_storage::inverted_index::PositionsReader> {
let path = segments_dir.join(format!("positions_{:020}.bin", seg.segment_number));
let reader = cloudsearch_storage::inverted_index::PositionsReader::read(&path)
let reader = cloudsearch_storage::inverted_index::PositionsReader::read_mmap(&path)
.await
.ok()?;
if reader.term_count() > 0 {
tracing::info!(terms = reader.term_count(), "loaded positions sidecar");
tracing::info!(
terms = reader.term_count(),
"loaded positions sidecar via mmap"
);
Some(reader)
} else {
None
Expand Down Expand Up @@ -792,7 +816,7 @@ impl IndexHandle {
.segments_dir
.join(format!("positions_{new_segment_number:020}.bin"));
if let Ok(reader) =
cloudsearch_storage::inverted_index::PositionsReader::read(&positions_path).await
cloudsearch_storage::inverted_index::PositionsReader::read_mmap(&positions_path).await
{
tracing::info!(
terms = reader.term_count(),
Expand Down Expand Up @@ -1582,6 +1606,45 @@ impl IndexHandle {
Ok(refreshed_documents)
}

/// Load suggest sidecar for a single segment, if it exists (used by flush).
async fn load_single_suggest_reader(
segments_dir: &Path,
seg: &SegmentMeta,
) -> Option<cloudsearch_storage::suggest_index::SuggestReader> {
use cloudsearch_storage::suggest_index::SuggestReader;

let path = segments_dir.join(format!("suggest_{:020}.bin", seg.segment_number));

// Try mmap first (zero-copy, OS manages page cache)
if let Ok(file) = tokio::fs::File::open(&path).await {
let std_file = file.into_std().await;
let _ = std_file.sync_all(); // best-effort sync; errors don't prevent fallback
if let Ok(mmap) = unsafe { memmap2::Mmap::map(&std_file) }
&& let Ok(reader) = SuggestReader::from_mmap(mmap)
&& reader.fields().count() > 0
{
tracing::info!(
fields = reader.fields().count(),
"loaded suggest sidecar via mmap"
);
return Some(reader);
}
}

// Fallback: read into heap
let data = tokio::fs::read(&path).await.ok()?;
let reader = SuggestReader::from_bytes(&data).ok()?;
if reader.fields().count() > 0 {
tracing::info!(
fields = reader.fields().count(),
"loaded suggest sidecar via read"
);
Some(reader)
} else {
None
}
}

/// Persists all in-memory data to disk and rolls over the WAL.
///
/// # Errors
Expand Down Expand Up @@ -1668,7 +1731,8 @@ impl IndexHandle {
let positions_path = self
.segments_dir
.join(format!("positions_{segment_number:020}.bin"));
match cloudsearch_storage::inverted_index::PositionsReader::read(&positions_path).await
match cloudsearch_storage::inverted_index::PositionsReader::read_mmap(&positions_path)
.await
{
Ok(reader) if reader.term_count() > 0 => {
self.positions_readers.push(reader);
Expand Down Expand Up @@ -1701,20 +1765,11 @@ impl IndexHandle {
"wrote suggest sidecar"
);
// Reload all suggest readers from the updated manifest to avoid accumulation
let mut new_readers = Vec::new();
self.suggest_readers = Vec::new();
for seg in &self.manifest.segments {
let path = self
.segments_dir
.join(format!("suggest_{:020}.bin", seg.segment_number));
if let Ok(data) = tokio::fs::read(&path).await
&& let Ok(reader) =
cloudsearch_storage::suggest_index::SuggestReader::from_bytes(&data)
&& reader.fields().count() > 0
{
new_readers.push(Some(reader));
}
let reader = Self::load_single_suggest_reader(&self.segments_dir, seg).await;
self.suggest_readers.push(reader);
}
self.suggest_readers = new_readers;
tracing::info!(
count = self.suggest_readers.len(),
"reloaded suggest readers after flush"
Expand Down Expand Up @@ -1792,7 +1847,7 @@ impl IndexHandle {
.join(format!("positions_{new_segment_number:020}.bin"));
self.positions_readers.clear();
if let Ok(reader) =
cloudsearch_storage::inverted_index::PositionsReader::read(&positions_path).await
cloudsearch_storage::inverted_index::PositionsReader::read_mmap(&positions_path).await
&& reader.term_count() > 0
{
self.positions_readers.push(reader);
Expand Down
1 change: 1 addition & 0 deletions rust/crates/cloudsearch-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ version.workspace = true
chrono.workspace = true
cloudsearch-common = { path = "../cloudsearch-common" }
crc32c.workspace = true
memmap2.workspace = true
serde.workspace = true
serde_json.workspace = true
tokio.workspace = true
Expand Down
Loading