From b00df16e034b35e25f0fa5c780cf25b0bc26a270 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 30 Mar 2026 22:13:57 +0000 Subject: [PATCH 1/2] =?UTF-8?q?fix:=20remove=20halftone=20interpolation=20?= =?UTF-8?q?=E2=80=94=20all=2017=20bins=20from=20real=20data?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The halftone path only sampled 9 of 17 golden-step positions and interpolated the other 8 as neighbor averages. Those 8 bins carried no real signal — dead planes in the projection. Fix: stride octaves (skip every Nth octave) but sample ALL 17 golden-step positions per sampled octave. Every bin gets direct measurement from actual weight values. stride=16, 5120-col row: 19 octaves × 17 positions = 323 samples across all 17 bins (was 171 samples across 9 real + 8 fake bins). https://claude.ai/code/session_01HmdXNPit7QsTCfhJFef3Ee --- src/hpc/gguf_indexer.rs | 98 +++++++++++------------------------------ 1 file changed, 25 insertions(+), 73 deletions(-) diff --git a/src/hpc/gguf_indexer.rs b/src/hpc/gguf_indexer.rs index 35c01fc3..4a26c814 100644 --- a/src/hpc/gguf_indexer.rs +++ b/src/hpc/gguf_indexer.rs @@ -283,58 +283,28 @@ pub fn project_8rows_bf16_simd( use crate::simd::F64x8; let n_octaves = (n_cols + BASE_DIM - 1) / BASE_DIM; - let use_halftone = octave_stride > 1; let mut sums: [F64x8; BASE_DIM] = [F64x8::splat(0.0); BASE_DIM]; let mut counts: [u32; BASE_DIM] = [0; BASE_DIM]; - if use_halftone { - let mut octave = 0; - while octave < n_octaves { - for hi in 0..9 { - let col = octave * BASE_DIM + HALFTONE_POS[hi] as usize; - if col < n_cols { - let bin = HALFTONE_TO_BIN[hi] as usize; - let offsets: [usize; 8] = [ - row_starts[0] + col, row_starts[1] + col, - row_starts[2] + col, row_starts[3] + col, - row_starts[4] + col, row_starts[5] + col, - row_starts[6] + col, row_starts[7] + col, - ]; - sums[bin] += gather_bf16_x8(buf, &offsets); - counts[bin] += 1; - } - } - octave += octave_stride; - } - - // Interpolate odd bins from even neighbors (per-lane, still SIMD) - for odd in (1..BASE_DIM).step_by(2) { - let left = sums[odd - 1]; - let right = sums[(odd + 1) % BASE_DIM]; - let left_c = counts[odd - 1].max(1); - let right_c = counts[(odd + 1) % BASE_DIM].max(1); - let left_mean = left * F64x8::splat(1.0 / left_c as f64); - let right_mean = right * F64x8::splat(1.0 / right_c as f64); - sums[odd] = (left_mean + right_mean) * F64x8::splat(0.5); - counts[odd] = 1; - } - } else { - for octave in 0..n_octaves { - for bi in 0..BASE_DIM { - let col = octave * BASE_DIM + GOLDEN_POS[bi] as usize; - if col < n_cols { - let offsets: [usize; 8] = [ - row_starts[0] + col, row_starts[1] + col, - row_starts[2] + col, row_starts[3] + col, - row_starts[4] + col, row_starts[5] + col, - row_starts[6] + col, row_starts[7] + col, - ]; - sums[bi] += gather_bf16_x8(buf, &offsets); - counts[bi] += 1; - } + // All 17 golden-step positions per sampled octave. Stride skips octaves, + // NOT positions — every bin gets real data from actual weight values. + let mut octave = 0; + while octave < n_octaves { + for bi in 0..BASE_DIM { + let col = octave * BASE_DIM + GOLDEN_POS[bi] as usize; + if col < n_cols { + let offsets: [usize; 8] = [ + row_starts[0] + col, row_starts[1] + col, + row_starts[2] + col, row_starts[3] + col, + row_starts[4] + col, row_starts[5] + col, + row_starts[6] + col, row_starts[7] + col, + ]; + sums[bi] += gather_bf16_x8(buf, &offsets); + counts[bi] += 1; } } + octave += octave_stride; } // Finalize: mean → scale → clamp → i16, all 8 lanes parallel @@ -365,39 +335,21 @@ pub fn project_8rows_bf16_simd( pub fn project_1row_bf16_strided(row: &[u16], octave_stride: usize) -> Base17 { let d = row.len(); let n_octaves = (d + BASE_DIM - 1) / BASE_DIM; - let use_halftone = octave_stride > 1; let mut sum = [0.0f64; BASE_DIM]; let mut count = [0u32; BASE_DIM]; - if use_halftone { - let mut octave = 0; - while octave < n_octaves { - for hi in 0..9 { - let col = octave * BASE_DIM + HALFTONE_POS[hi] as usize; - if col < d { - sum[HALFTONE_TO_BIN[hi] as usize] += bf16_to_f64(row[col]); - count[HALFTONE_TO_BIN[hi] as usize] += 1; - } - } - octave += octave_stride; - } - for odd in (1..BASE_DIM).step_by(2) { - let lc = count[odd - 1].max(1) as f64; - let rc = count[(odd + 1) % BASE_DIM].max(1) as f64; - sum[odd] = (sum[odd - 1] / lc + sum[(odd + 1) % BASE_DIM] / rc) * 0.5; - count[odd] = 1; - } - } else { - for octave in 0..n_octaves { - for bi in 0..BASE_DIM { - let col = octave * BASE_DIM + GOLDEN_POS[bi] as usize; - if col < d { - sum[bi] += bf16_to_f64(row[col]); - count[bi] += 1; - } + // All 17 positions per sampled octave — no halftone, all bins real + let mut octave = 0; + while octave < n_octaves { + for bi in 0..BASE_DIM { + let col = octave * BASE_DIM + GOLDEN_POS[bi] as usize; + if col < d { + sum[bi] += bf16_to_f64(row[col]); + count[bi] += 1; } } + octave += octave_stride; } let mut dims = [0i16; BASE_DIM]; From 4d09df099705298e0dd20a4b24195c0fd00e0ddf Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 31 Mar 2026 21:56:22 +0000 Subject: [PATCH 2/2] fix: HttpRangeReader stall detection, CDN re-resolve, HF API resolve MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Root cause: 21 GB tensor reads stall silently when HuggingFace CDN drops the TCP connection. curl sits forever with no data. Fixes: - --speed-limit 100000 --speed-time 30: abort if < 100 KB/s for 30s - Re-resolve URL on 403 (CDN token expiry after ~1 hour) - Segment-aligned fetches (no overlapping reads on sequential access) - 6 retries (was 4) with capped exponential backoff (max 32s) - from_hf() constructor: resolves via huggingface_hub Python API first, falls back to curl HEAD, then HF REST API. Stores repo/filename for automatic re-resolution on token expiry. - resolve_hf_url() tries 3 methods: Python HF API → curl HEAD → REST API https://claude.ai/code/session_01HmdXNPit7QsTCfhJFef3Ee --- src/hpc/http_reader.rs | 298 +++++++++++++++++++++++++++++++---------- 1 file changed, 227 insertions(+), 71 deletions(-) diff --git a/src/hpc/http_reader.rs b/src/hpc/http_reader.rs index c55f62dd..9183e423 100644 --- a/src/hpc/http_reader.rs +++ b/src/hpc/http_reader.rs @@ -3,34 +3,35 @@ //! Enables streaming GGUF indexing directly from HuggingFace without //! downloading the full file to disk. Uses `Range: bytes=N-M` headers. //! -//! ```text -//! let reader = HttpRangeReader::new(url, total_size)?; -//! let gguf = gguf::read_gguf_header(&mut reader)?; // reads ~1 MB -//! for tensor in &gguf.tensors { -//! let data = gguf::read_tensor_f32(&mut reader, &gguf, &tensor)?; -//! // process tensor, then drop — bounded RAM -//! } -//! ``` +//! Features: +//! - Segment-aligned fetches (no overlapping reads) +//! - HuggingFace CDN URL resolution via `resolve_hf_url()` +//! - Stall detection via `--speed-limit` / `--speed-time` +//! - Retry with exponential backoff (4 retries per segment) +//! - Re-resolve URL on 403/redirect failure (CDN token expiry) +//! - LRU segment cache (192 MB at 64 MB segments) use std::io::{self, Read, Seek, SeekFrom}; use std::process::{Command, Stdio}; /// HTTP range reader that implements Read + Seek. /// -/// Internally splits chunks into smaller segments (default 64 MB) with -/// retry + exponential backoff. A mid-download failure only refetches -/// the failed segment, not the entire 256 MB chunk. +/// Internally splits reads into segment-aligned fetches (default 64 MB) +/// with retry + exponential backoff + stall detection. A mid-download +/// failure only refetches the failed segment, not the entire chunk. /// /// Caches the last `SEGMENT_CACHE_SIZE` segments so backward seeks /// within the cache window are free (no re-fetch). pub struct HttpRangeReader { url: String, + repo: Option, // for re-resolve on 403 + filename: Option, // for re-resolve on 403 position: u64, total_size: u64, chunk_size: usize, bytes_downloaded: u64, - // Segmented cache: each entry = (file_offset, data) + // Segmented cache: each entry = (segment_aligned_offset, data) segment_size: usize, segment_cache: Vec<(u64, Vec)>, max_cached_segments: usize, @@ -38,17 +39,21 @@ pub struct HttpRangeReader { // Current active segment (for Read trait) active_segment_start: u64, active_segment_len: usize, - active_segment_idx: Option, // index into segment_cache + active_segment_idx: Option, } /// Maximum retry attempts per segment. -const MAX_RETRIES: u32 = 4; +const MAX_RETRIES: u32 = 6; /// Initial backoff delay in milliseconds. -const INITIAL_BACKOFF_MS: u64 = 1000; +const INITIAL_BACKOFF_MS: u64 = 2000; /// Default segment size: 64 MB (4 segments per 256 MB chunk). const DEFAULT_SEGMENT_SIZE: usize = 64 * 1024 * 1024; /// Number of segments to cache (192 MB at 64 MB segments). const SEGMENT_CACHE_SIZE: usize = 3; +/// Minimum transfer speed before curl aborts (bytes/sec). 100 KB/s. +const SPEED_LIMIT: u32 = 100_000; +/// Seconds below SPEED_LIMIT before curl aborts the transfer. +const SPEED_TIME: u32 = 30; impl HttpRangeReader { /// Default chunk: 256 MB (fewer HTTP round-trips, fits in RAM easily). @@ -60,6 +65,8 @@ impl HttpRangeReader { pub fn new(url: String, total_size: u64) -> Self { Self { url, + repo: None, + filename: None, position: 0, total_size, chunk_size: Self::DEFAULT_CHUNK, @@ -75,10 +82,11 @@ impl HttpRangeReader { /// Create with custom chunk size. pub fn with_chunk_size(url: String, total_size: u64, chunk_size: usize) -> Self { - // Segment size = chunk_size / 4, minimum 16 MB let seg = (chunk_size / 4).max(16 * 1024 * 1024); Self { url, + repo: None, + filename: None, position: 0, total_size, chunk_size, @@ -92,39 +100,102 @@ impl HttpRangeReader { } } + /// Create from HuggingFace repo + filename. Resolves CDN URL and exact size. + /// + /// Uses `resolve_hf_url()` to get the final CDN URL and Content-Length. + /// Stores repo/filename for re-resolution on 403 (token expiry). + pub fn from_hf(repo: &str, filename: &str, chunk_size: usize) -> Result { + let (url, size) = resolve_hf_url(repo, filename)?; + let seg = (chunk_size / 4).max(16 * 1024 * 1024); + Ok(Self { + url, + repo: Some(repo.to_string()), + filename: Some(filename.to_string()), + position: 0, + total_size: size, + chunk_size, + bytes_downloaded: 0, + segment_size: seg, + segment_cache: Vec::with_capacity(SEGMENT_CACHE_SIZE), + max_cached_segments: SEGMENT_CACHE_SIZE, + active_segment_start: 0, + active_segment_len: 0, + active_segment_idx: None, + }) + } + /// Total bytes fetched from network. pub fn bytes_downloaded(&self) -> u64 { self.bytes_downloaded } - /// Fetch a segment with retry + exponential backoff. + /// Exact file size (from HEAD or HF API). + pub fn total_size(&self) -> u64 { + self.total_size + } + + /// Re-resolve the URL (e.g. after CDN token expiry). + fn re_resolve_url(&mut self) -> io::Result<()> { + if let (Some(repo), Some(filename)) = (&self.repo, &self.filename) { + eprintln!(" re-resolving URL for {}/{}", repo, filename); + match resolve_hf_url(repo, filename) { + Ok((new_url, size)) => { + eprintln!(" resolved: {} ({} bytes)", new_url, size); + self.url = new_url; + if size > 0 && size != self.total_size { + eprintln!(" WARNING: size changed {} → {}", self.total_size, size); + } + Ok(()) + } + Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), + } + } else { + Err(io::Error::new( + io::ErrorKind::Other, + "cannot re-resolve: no repo/filename stored (use from_hf())", + )) + } + } + + /// Align a position to segment boundaries. + fn segment_start_for(&self, pos: u64) -> u64 { + (pos / self.segment_size as u64) * self.segment_size as u64 + } + + /// Fetch a segment with retry + exponential backoff + stall detection. /// - /// Returns the fetched bytes. On permanent failure after MAX_RETRIES, returns error. + /// On 403 or repeated failure, attempts to re-resolve the URL (CDN token expiry). fn fetch_segment_with_retry(&mut self, start: u64, len: usize) -> io::Result> { let end = (start + len as u64 - 1).min(self.total_size - 1); let range = format!("{}-{}", start, end); let expected = (end - start + 1) as usize; + let mut resolved_this_call = false; for attempt in 0..MAX_RETRIES { if attempt > 0 { - let delay = INITIAL_BACKOFF_MS * (1u64 << (attempt - 1)); + let delay = INITIAL_BACKOFF_MS * (1u64 << (attempt - 1).min(4)); eprintln!(" retry {}/{} after {}ms (segment {}-{})", attempt + 1, MAX_RETRIES, delay, start, end); std::thread::sleep(std::time::Duration::from_millis(delay)); } + let speed_limit_str = SPEED_LIMIT.to_string(); + let speed_time_str = SPEED_TIME.to_string(); + let result = Command::new("curl") .args(&[ "-sL", - "--retry", "2", // curl-level retry for connection drops - "--retry-delay", "1", + "--retry", "2", + "--retry-delay", "2", "--connect-timeout", "30", - "--max-time", "300", // 5 min max per segment + "--max-time", "600", // 10 min max per 64 MB segment + "--speed-limit", &speed_limit_str, // abort if < 100 KB/s + "--speed-time", &speed_time_str, // for > 30 seconds "-r", &range, &self.url, ]) .stdout(Stdio::piped()) - .stderr(Stdio::null()) + .stderr(Stdio::piped()) .output(); match result { @@ -133,7 +204,6 @@ impl HttpRangeReader { return Ok(output.stdout); } Ok(output) if output.status.success() && !output.stdout.is_empty() => { - // Partial read — might be near EOF, accept it self.bytes_downloaded += output.stdout.len() as u64; if output.stdout.len() >= expected / 2 { return Ok(output.stdout); @@ -141,8 +211,26 @@ impl HttpRangeReader { eprintln!(" short read: got {}/{} bytes", output.stdout.len(), expected); } Ok(output) => { - eprintln!(" fetch failed: status={} got={} bytes", - output.status, output.stdout.len()); + let stderr = String::from_utf8_lossy(&output.stderr); + let code = output.status.code().unwrap_or(-1); + eprintln!(" fetch failed: exit={} got={} bytes stderr={}", + code, output.stdout.len(), stderr.trim()); + + // 403 or curl exit 22 (HTTP error) → re-resolve CDN URL + if (code == 22 || stderr.contains("403") || stderr.contains("expired")) + && !resolved_this_call + { + if self.re_resolve_url().is_ok() { + resolved_this_call = true; + eprintln!(" URL re-resolved, retrying immediately"); + continue; + } + } + + // Curl exit 28 = timeout, 56 = recv failure → stall detected + if code == 28 || code == 56 { + eprintln!(" stall/timeout detected (curl exit {})", code); + } } Err(e) => { eprintln!(" curl error: {}", e); @@ -157,11 +245,16 @@ impl HttpRangeReader { } /// Find or fetch the segment containing `self.position`. + /// + /// Segments are aligned to `segment_size` boundaries to avoid + /// overlapping fetches when position advances sequentially. fn ensure_segment(&mut self) -> io::Result<()> { if self.position >= self.total_size { return Ok(()); } + let aligned_start = self.segment_start_for(self.position); + // Check if position is within any cached segment for (idx, (seg_start, seg_data)) in self.segment_cache.iter().enumerate() { let seg_end = *seg_start + seg_data.len() as u64; @@ -173,10 +266,10 @@ impl HttpRangeReader { } } - // Not cached — fetch new segment - let remaining = (self.total_size - self.position) as usize; + // Not cached — fetch aligned segment + let remaining = (self.total_size - aligned_start) as usize; let fetch_len = self.segment_size.min(remaining); - let data = self.fetch_segment_with_retry(self.position, fetch_len)?; + let data = self.fetch_segment_with_retry(aligned_start, fetch_len)?; let data_len = data.len(); // Evict oldest segment if cache is full @@ -184,10 +277,10 @@ impl HttpRangeReader { self.segment_cache.remove(0); } - self.segment_cache.push((self.position, data)); + self.segment_cache.push((aligned_start, data)); let idx = self.segment_cache.len() - 1; - self.active_segment_start = self.position; + self.active_segment_start = aligned_start; self.active_segment_len = data_len; self.active_segment_idx = Some(idx); @@ -247,63 +340,103 @@ impl Seek for HttpRangeReader { } } -/// Resolve a HuggingFace model file URL and get its size. +/// Resolve a HuggingFace model file URL and get its exact size. +/// +/// Tries three methods in order: +/// 1. `huggingface_hub` Python API (most reliable — handles auth, gated models) +/// 2. curl HEAD with redirect follow (fast but may fail on gated repos) +/// 3. HuggingFace Hub REST API (no Python dependency) /// /// Returns (final_url, size_bytes). pub fn resolve_hf_url(repo: &str, filename: &str) -> Result<(String, u64), String> { - // Get redirect URL and Content-Length via curl HEAD + // Method 1: Python huggingface_hub (handles auth tokens, gated models) + if let Ok(py_out) = Command::new("python3") + .args(&["-c", &format!( + "from huggingface_hub import hf_hub_url, get_hf_file_metadata; \ + url = hf_hub_url('{}', '{}'); \ + meta = get_hf_file_metadata(url); \ + print(meta.size); print(meta.location if hasattr(meta, 'location') else url)", + repo, filename + )]) + .output() + { + if py_out.status.success() { + let text = String::from_utf8_lossy(&py_out.stdout); + let lines: Vec<&str> = text.lines().collect(); + if lines.len() >= 2 { + if let Ok(size) = lines[0].trim().parse::() { + if size > 0 { + let url = lines[1].trim().to_string(); + eprintln!(" resolved via HF API: {} bytes", size); + return Ok((url, size)); + } + } + } + } + } + + // Method 2: curl HEAD with redirect follow let url = format!( "https://huggingface.co/{}/resolve/main/{}", repo, filename ); - // Follow redirects, get final URL and size - let output = Command::new("curl") - .args(&["-sIL", &url]) + if let Ok(output) = Command::new("curl") + .args(&["-sIL", "--connect-timeout", "15", "--max-time", "30", &url]) .output() - .map_err(|e| format!("curl failed: {}", e))?; - - let headers = String::from_utf8_lossy(&output.stdout); - let mut size: u64 = 0; - let mut final_url = url.clone(); - - for line in headers.lines() { - if let Some(val) = line.strip_prefix("content-length: ").or(line.strip_prefix("Content-Length: ")) { - if let Ok(s) = val.trim().parse::() { - size = s; + { + let headers = String::from_utf8_lossy(&output.stdout); + let mut size: u64 = 0; + let mut final_url = url.clone(); + + for line in headers.lines() { + let lower = line.to_lowercase(); + if lower.starts_with("content-length:") { + if let Some(val) = line.split(':').nth(1) { + if let Ok(s) = val.trim().parse::() { + size = s; + } + } + } + if lower.starts_with("location:") { + if let Some(val) = line.split_once(':').map(|(_, v)| v.trim()) { + if val.starts_with("http") { + final_url = val.to_string(); + } + } } } - if let Some(val) = line.strip_prefix("location: ").or(line.strip_prefix("Location: ")) { - final_url = val.trim().to_string(); - } - } - if size == 0 { - // Try python fallback - let py_out = Command::new("python3") - .args(&["-c", &format!( - "from huggingface_hub import hf_hub_url, get_hf_file_metadata; \ - url = hf_hub_url('{}', '{}'); \ - meta = get_hf_file_metadata(url); \ - print(meta.size); print(url)", - repo, filename - )]) - .output() - .map_err(|e| format!("python3 fallback failed: {}", e))?; - - let py_text = String::from_utf8_lossy(&py_out.stdout); - let lines: Vec<&str> = py_text.lines().collect(); - if lines.len() >= 2 { - size = lines[0].trim().parse().unwrap_or(0); - final_url = lines[1].trim().to_string(); + if size > 0 { + eprintln!(" resolved via curl HEAD: {} bytes", size); + return Ok((final_url, size)); } } - if size == 0 { - return Err("Could not determine file size".into()); + // Method 3: HuggingFace Hub REST API (no Python needed) + let api_url = format!( + "https://huggingface.co/api/models/{}/tree/main/{}", + repo, filename.rsplit('/').next().unwrap_or(filename) + ); + if let Ok(output) = Command::new("curl") + .args(&["-sL", "--connect-timeout", "15", "--max-time", "30", &api_url]) + .output() + { + let text = String::from_utf8_lossy(&output.stdout); + // Parse JSON-ish for "size": NNN + if let Some(pos) = text.find("\"size\":") { + let after = &text[pos + 7..]; + let num_str: String = after.chars().take_while(|c| c.is_ascii_digit()).collect(); + if let Ok(size) = num_str.parse::() { + if size > 0 { + eprintln!(" resolved via HF REST API: {} bytes", size); + return Ok((url, size)); + } + } + } } - Ok((final_url, size)) + Err(format!("Could not resolve {}/{}", repo, filename)) } #[cfg(test)] @@ -338,4 +471,27 @@ mod tests { let mut buf = [0u8; 10]; assert_eq!(r.read(&mut buf).unwrap(), 0); } + + #[test] + fn test_segment_alignment() { + let r = HttpRangeReader::new("http://example.com/test".into(), 1_000_000_000); + // Default segment = 64 MB = 67108864 + assert_eq!(r.segment_start_for(0), 0); + assert_eq!(r.segment_start_for(1000), 0); + assert_eq!(r.segment_start_for(67_108_864), 67_108_864); + assert_eq!(r.segment_start_for(67_108_865), 67_108_864); + assert_eq!(r.segment_start_for(134_217_728), 134_217_728); + } + + #[test] + #[ignore] // Requires network + fn test_resolve_hf_url() { + let (url, size) = resolve_hf_url( + "unsloth/Llama-4-Scout-17B-16E-Instruct-GGUF", + "BF16/Llama-4-Scout-17B-16E-Instruct-BF16-00005-of-00005.gguf", + ).expect("resolve_hf_url"); + assert!(size > 0, "size should be > 0"); + assert!(url.contains("http"), "url should be HTTP: {}", url); + eprintln!("resolved: {} ({} bytes)", url, size); + } }