Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 129 additions & 51 deletions src/hpc/http_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,39 @@ use std::process::{Command, Stdio};

/// HTTP range reader that implements Read + Seek.
///
/// Each `read()` call fetches bytes via `curl -r start-end`.
/// Buffered: fetches chunks of `chunk_size` to avoid per-byte HTTP calls.
/// Internally splits chunks into smaller segments (default 64 MB) with
/// retry + exponential backoff. A mid-download failure only refetches
/// the failed segment, not the entire 256 MB chunk.
///
/// Caches the last `SEGMENT_CACHE_SIZE` segments so backward seeks
/// within the cache window are free (no re-fetch).
pub struct HttpRangeReader {
url: String,
position: u64,
total_size: u64,
buffer: Vec<u8>,
buf_start: u64, // file offset where buffer starts
buf_len: usize, // valid bytes in buffer
chunk_size: usize,
bytes_downloaded: u64,

// Segmented cache: each entry = (file_offset, data)
segment_size: usize,
segment_cache: Vec<(u64, Vec<u8>)>,
max_cached_segments: usize,

// Current active segment (for Read trait)
active_segment_start: u64,
active_segment_len: usize,
active_segment_idx: Option<usize>, // index into segment_cache
}

/// Maximum retry attempts per segment.
const MAX_RETRIES: u32 = 4;
/// Initial backoff delay in milliseconds.
const INITIAL_BACKOFF_MS: u64 = 1000;
/// Default segment size: 64 MB (4 segments per 256 MB chunk).
const DEFAULT_SEGMENT_SIZE: usize = 64 * 1024 * 1024;
/// Number of segments to cache (192 MB at 64 MB segments).
const SEGMENT_CACHE_SIZE: usize = 3;

impl HttpRangeReader {
/// Default chunk: 256 MB (fewer HTTP round-trips, fits in RAM easily).
const DEFAULT_CHUNK: usize = 256 * 1024 * 1024;
Expand All @@ -42,25 +62,33 @@ impl HttpRangeReader {
url,
position: 0,
total_size,
buffer: vec![0u8; Self::DEFAULT_CHUNK],
buf_start: 0,
buf_len: 0,
chunk_size: Self::DEFAULT_CHUNK,
bytes_downloaded: 0,
segment_size: DEFAULT_SEGMENT_SIZE,
segment_cache: Vec::with_capacity(SEGMENT_CACHE_SIZE),
max_cached_segments: SEGMENT_CACHE_SIZE,
active_segment_start: 0,
active_segment_len: 0,
active_segment_idx: None,
}
}

/// Create with custom chunk size.
pub fn with_chunk_size(url: String, total_size: u64, chunk_size: usize) -> Self {
// Segment size = chunk_size / 4, minimum 16 MB
let seg = (chunk_size / 4).max(16 * 1024 * 1024);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve caller chunk size in custom reader constructor

with_chunk_size no longer honors the caller-provided chunk size as the max fetch unit: it derives segment_size as chunk_size / 4 with a hard minimum of 16 MB, and read() always fetches by segment_size. For inputs that intentionally pass small chunk sizes (e.g. 1–8 MB to cap memory/request size), this now performs much larger network reads than requested and changes runtime memory/network behavior compared to the prior API contract.

Useful? React with 👍 / 👎.

Self {
url,
position: 0,
total_size,
buffer: vec![0u8; chunk_size],
buf_start: 0,
buf_len: 0,
chunk_size,
bytes_downloaded: 0,
segment_size: seg,
segment_cache: Vec::with_capacity(SEGMENT_CACHE_SIZE),
max_cached_segments: SEGMENT_CACHE_SIZE,
active_segment_start: 0,
active_segment_len: 0,
active_segment_idx: None,
}
}

Expand All @@ -69,56 +97,100 @@ impl HttpRangeReader {
self.bytes_downloaded
}

/// Fetch a range of bytes from the URL via curl.
fn fetch_range(&mut self, start: u64, len: usize) -> io::Result<usize> {
/// Fetch a segment with retry + exponential backoff.
///
/// Returns the fetched bytes. On permanent failure after MAX_RETRIES, returns error.
fn fetch_segment_with_retry(&mut self, start: u64, len: usize) -> io::Result<Vec<u8>> {
let end = (start + len as u64 - 1).min(self.total_size - 1);
let range = format!("{}-{}", start, end);
let expected = (end - start + 1) as usize;

for attempt in 0..MAX_RETRIES {
if attempt > 0 {
let delay = INITIAL_BACKOFF_MS * (1u64 << (attempt - 1));
eprintln!(" retry {}/{} after {}ms (segment {}-{})",
attempt + 1, MAX_RETRIES, delay, start, end);
std::thread::sleep(std::time::Duration::from_millis(delay));
}

let output = Command::new("curl")
.args(&["-sL", "-r", &range, &self.url])
.stdout(Stdio::piped())
.stderr(Stdio::null())
.output()
.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("curl failed: {}", e)))?;

if !output.status.success() {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("curl returned status {}", output.status),
));
}

let fetched = output.stdout.len();
if fetched == 0 {
return Ok(0);
let result = Command::new("curl")
.args(&[
"-sL",
"--retry", "2", // curl-level retry for connection drops
"--retry-delay", "1",
"--connect-timeout", "30",
"--max-time", "300", // 5 min max per segment
"-r", &range,
&self.url,
])
.stdout(Stdio::piped())
.stderr(Stdio::null())
.output();

match result {
Ok(output) if output.status.success() && output.stdout.len() == expected => {
self.bytes_downloaded += output.stdout.len() as u64;
return Ok(output.stdout);
}
Ok(output) if output.status.success() && !output.stdout.is_empty() => {
// Partial read — might be near EOF, accept it
self.bytes_downloaded += output.stdout.len() as u64;
if output.stdout.len() >= expected / 2 {
return Ok(output.stdout);
}
eprintln!(" short read: got {}/{} bytes", output.stdout.len(), expected);
}
Ok(output) => {
eprintln!(" fetch failed: status={} got={} bytes",
output.status, output.stdout.len());
}
Err(e) => {
eprintln!(" curl error: {}", e);
}
}
}

// Copy to buffer
let copy_len = fetched.min(self.buffer.len());
self.buffer[..copy_len].copy_from_slice(&output.stdout[..copy_len]);
self.buf_start = start;
self.buf_len = copy_len;
self.bytes_downloaded += fetched as u64;

Ok(copy_len)
Err(io::Error::new(
io::ErrorKind::Other,
format!("segment fetch failed after {} retries: bytes {}-{}", MAX_RETRIES, start, end),
))
}

/// Ensure the buffer covers `self.position` and has data ready.
fn ensure_buffered(&mut self) -> io::Result<()> {
/// Find or fetch the segment containing `self.position`.
fn ensure_segment(&mut self) -> io::Result<()> {
if self.position >= self.total_size {
return Ok(());
}

// Check if position is within current buffer
let buf_end = self.buf_start + self.buf_len as u64;
if self.position >= self.buf_start && self.position < buf_end {
return Ok(()); // already buffered
// Check if position is within any cached segment
for (idx, (seg_start, seg_data)) in self.segment_cache.iter().enumerate() {
let seg_end = *seg_start + seg_data.len() as u64;
if self.position >= *seg_start && self.position < seg_end {
self.active_segment_start = *seg_start;
self.active_segment_len = seg_data.len();
self.active_segment_idx = Some(idx);
return Ok(());
}
}

// Need to fetch
// Not cached — fetch new segment
let remaining = (self.total_size - self.position) as usize;
let fetch_len = self.chunk_size.min(remaining);
self.fetch_range(self.position, fetch_len)?;
let fetch_len = self.segment_size.min(remaining);
let data = self.fetch_segment_with_retry(self.position, fetch_len)?;
let data_len = data.len();

// Evict oldest segment if cache is full
if self.segment_cache.len() >= self.max_cached_segments {
self.segment_cache.remove(0);
}

self.segment_cache.push((self.position, data));
let idx = self.segment_cache.len() - 1;

self.active_segment_start = self.position;
self.active_segment_len = data_len;
self.active_segment_idx = Some(idx);

Ok(())
}
}
Expand All @@ -129,17 +201,23 @@ impl Read for HttpRangeReader {
return Ok(0); // EOF
}

self.ensure_buffered()?;
self.ensure_segment()?;

let idx = match self.active_segment_idx {
Some(i) if i < self.segment_cache.len() => i,
_ => return Ok(0),
};

let buf_offset = (self.position - self.buf_start) as usize;
let available = self.buf_len - buf_offset;
let (seg_start, ref seg_data) = self.segment_cache[idx];
let offset = (self.position - seg_start) as usize;
let available = seg_data.len() - offset;
let to_copy = buf.len().min(available);

if to_copy == 0 {
return Ok(0);
}

buf[..to_copy].copy_from_slice(&self.buffer[buf_offset..buf_offset + to_copy]);
buf[..to_copy].copy_from_slice(&seg_data[offset..offset + to_copy]);
self.position += to_copy as u64;
Ok(to_copy)
}
Expand Down
Loading