diff --git a/src/hpc/http_reader.rs b/src/hpc/http_reader.rs index 84250f3c..c55f62dd 100644 --- a/src/hpc/http_reader.rs +++ b/src/hpc/http_reader.rs @@ -17,19 +17,39 @@ use std::process::{Command, Stdio}; /// HTTP range reader that implements Read + Seek. /// -/// Each `read()` call fetches bytes via `curl -r start-end`. -/// Buffered: fetches chunks of `chunk_size` to avoid per-byte HTTP calls. +/// Internally splits chunks into smaller segments (default 64 MB) with +/// retry + exponential backoff. A mid-download failure only refetches +/// the failed segment, not the entire 256 MB chunk. +/// +/// Caches the last `SEGMENT_CACHE_SIZE` segments so backward seeks +/// within the cache window are free (no re-fetch). pub struct HttpRangeReader { url: String, position: u64, total_size: u64, - buffer: Vec, - buf_start: u64, // file offset where buffer starts - buf_len: usize, // valid bytes in buffer chunk_size: usize, bytes_downloaded: u64, + + // Segmented cache: each entry = (file_offset, data) + segment_size: usize, + segment_cache: Vec<(u64, Vec)>, + max_cached_segments: usize, + + // Current active segment (for Read trait) + active_segment_start: u64, + active_segment_len: usize, + active_segment_idx: Option, // index into segment_cache } +/// Maximum retry attempts per segment. +const MAX_RETRIES: u32 = 4; +/// Initial backoff delay in milliseconds. +const INITIAL_BACKOFF_MS: u64 = 1000; +/// Default segment size: 64 MB (4 segments per 256 MB chunk). +const DEFAULT_SEGMENT_SIZE: usize = 64 * 1024 * 1024; +/// Number of segments to cache (192 MB at 64 MB segments). +const SEGMENT_CACHE_SIZE: usize = 3; + impl HttpRangeReader { /// Default chunk: 256 MB (fewer HTTP round-trips, fits in RAM easily). const DEFAULT_CHUNK: usize = 256 * 1024 * 1024; @@ -42,25 +62,33 @@ impl HttpRangeReader { url, position: 0, total_size, - buffer: vec![0u8; Self::DEFAULT_CHUNK], - buf_start: 0, - buf_len: 0, chunk_size: Self::DEFAULT_CHUNK, bytes_downloaded: 0, + segment_size: DEFAULT_SEGMENT_SIZE, + segment_cache: Vec::with_capacity(SEGMENT_CACHE_SIZE), + max_cached_segments: SEGMENT_CACHE_SIZE, + active_segment_start: 0, + active_segment_len: 0, + active_segment_idx: None, } } /// Create with custom chunk size. pub fn with_chunk_size(url: String, total_size: u64, chunk_size: usize) -> Self { + // Segment size = chunk_size / 4, minimum 16 MB + let seg = (chunk_size / 4).max(16 * 1024 * 1024); Self { url, position: 0, total_size, - buffer: vec![0u8; chunk_size], - buf_start: 0, - buf_len: 0, chunk_size, bytes_downloaded: 0, + segment_size: seg, + segment_cache: Vec::with_capacity(SEGMENT_CACHE_SIZE), + max_cached_segments: SEGMENT_CACHE_SIZE, + active_segment_start: 0, + active_segment_len: 0, + active_segment_idx: None, } } @@ -69,56 +97,100 @@ impl HttpRangeReader { self.bytes_downloaded } - /// Fetch a range of bytes from the URL via curl. - fn fetch_range(&mut self, start: u64, len: usize) -> io::Result { + /// Fetch a segment with retry + exponential backoff. + /// + /// Returns the fetched bytes. On permanent failure after MAX_RETRIES, returns error. + fn fetch_segment_with_retry(&mut self, start: u64, len: usize) -> io::Result> { let end = (start + len as u64 - 1).min(self.total_size - 1); let range = format!("{}-{}", start, end); + let expected = (end - start + 1) as usize; + + for attempt in 0..MAX_RETRIES { + if attempt > 0 { + let delay = INITIAL_BACKOFF_MS * (1u64 << (attempt - 1)); + eprintln!(" retry {}/{} after {}ms (segment {}-{})", + attempt + 1, MAX_RETRIES, delay, start, end); + std::thread::sleep(std::time::Duration::from_millis(delay)); + } - let output = Command::new("curl") - .args(&["-sL", "-r", &range, &self.url]) - .stdout(Stdio::piped()) - .stderr(Stdio::null()) - .output() - .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("curl failed: {}", e)))?; - - if !output.status.success() { - return Err(io::Error::new( - io::ErrorKind::Other, - format!("curl returned status {}", output.status), - )); - } - - let fetched = output.stdout.len(); - if fetched == 0 { - return Ok(0); + let result = Command::new("curl") + .args(&[ + "-sL", + "--retry", "2", // curl-level retry for connection drops + "--retry-delay", "1", + "--connect-timeout", "30", + "--max-time", "300", // 5 min max per segment + "-r", &range, + &self.url, + ]) + .stdout(Stdio::piped()) + .stderr(Stdio::null()) + .output(); + + match result { + Ok(output) if output.status.success() && output.stdout.len() == expected => { + self.bytes_downloaded += output.stdout.len() as u64; + return Ok(output.stdout); + } + Ok(output) if output.status.success() && !output.stdout.is_empty() => { + // Partial read — might be near EOF, accept it + self.bytes_downloaded += output.stdout.len() as u64; + if output.stdout.len() >= expected / 2 { + return Ok(output.stdout); + } + eprintln!(" short read: got {}/{} bytes", output.stdout.len(), expected); + } + Ok(output) => { + eprintln!(" fetch failed: status={} got={} bytes", + output.status, output.stdout.len()); + } + Err(e) => { + eprintln!(" curl error: {}", e); + } + } } - // Copy to buffer - let copy_len = fetched.min(self.buffer.len()); - self.buffer[..copy_len].copy_from_slice(&output.stdout[..copy_len]); - self.buf_start = start; - self.buf_len = copy_len; - self.bytes_downloaded += fetched as u64; - - Ok(copy_len) + Err(io::Error::new( + io::ErrorKind::Other, + format!("segment fetch failed after {} retries: bytes {}-{}", MAX_RETRIES, start, end), + )) } - /// Ensure the buffer covers `self.position` and has data ready. - fn ensure_buffered(&mut self) -> io::Result<()> { + /// Find or fetch the segment containing `self.position`. + fn ensure_segment(&mut self) -> io::Result<()> { if self.position >= self.total_size { return Ok(()); } - // Check if position is within current buffer - let buf_end = self.buf_start + self.buf_len as u64; - if self.position >= self.buf_start && self.position < buf_end { - return Ok(()); // already buffered + // Check if position is within any cached segment + for (idx, (seg_start, seg_data)) in self.segment_cache.iter().enumerate() { + let seg_end = *seg_start + seg_data.len() as u64; + if self.position >= *seg_start && self.position < seg_end { + self.active_segment_start = *seg_start; + self.active_segment_len = seg_data.len(); + self.active_segment_idx = Some(idx); + return Ok(()); + } } - // Need to fetch + // Not cached — fetch new segment let remaining = (self.total_size - self.position) as usize; - let fetch_len = self.chunk_size.min(remaining); - self.fetch_range(self.position, fetch_len)?; + let fetch_len = self.segment_size.min(remaining); + let data = self.fetch_segment_with_retry(self.position, fetch_len)?; + let data_len = data.len(); + + // Evict oldest segment if cache is full + if self.segment_cache.len() >= self.max_cached_segments { + self.segment_cache.remove(0); + } + + self.segment_cache.push((self.position, data)); + let idx = self.segment_cache.len() - 1; + + self.active_segment_start = self.position; + self.active_segment_len = data_len; + self.active_segment_idx = Some(idx); + Ok(()) } } @@ -129,17 +201,23 @@ impl Read for HttpRangeReader { return Ok(0); // EOF } - self.ensure_buffered()?; + self.ensure_segment()?; + + let idx = match self.active_segment_idx { + Some(i) if i < self.segment_cache.len() => i, + _ => return Ok(0), + }; - let buf_offset = (self.position - self.buf_start) as usize; - let available = self.buf_len - buf_offset; + let (seg_start, ref seg_data) = self.segment_cache[idx]; + let offset = (self.position - seg_start) as usize; + let available = seg_data.len() - offset; let to_copy = buf.len().min(available); if to_copy == 0 { return Ok(0); } - buf[..to_copy].copy_from_slice(&self.buffer[buf_offset..buf_offset + to_copy]); + buf[..to_copy].copy_from_slice(&seg_data[offset..offset + to_copy]); self.position += to_copy as u64; Ok(to_copy) }