Skip to content

feat: column-major streaming Parquet reader primitive#6386

Open
g-talbot wants to merge 1 commit intomainfrom
gtt/streaming-parquet-reader
Open

feat: column-major streaming Parquet reader primitive#6386
g-talbot wants to merge 1 commit intomainfrom
gtt/streaming-parquet-reader

Conversation

@g-talbot
Copy link
Copy Markdown
Contributor

@g-talbot g-talbot commented May 5, 2026

Summary

PR-4 of the streaming-merge stack. Symmetric with PR-2 (#6384) on the
read side: lands a primitive in quickwit-parquet-engine/src/storage/streaming_reader.rs
that takes a minimal RemoteByteSource trait and produces per-column-
chunk Bytes in storage order via exactly one footer GET + one
body GET
in the common path.

No production callers in this PR. PR-5 will wrap a legacy adapter for
multi-RG files with no qh.rg_partition_prefix_len claim; PR-6's
streaming merge engine consumes both shapes through the same trait;
PR-7 wires ParquetMergeExecutor.

Caller contract

1. RemoteByteSource (caller-implemented; ~10 lines on top of Storage)
2. StreamingParquetReader::try_open(source, path)   → footer GET, parse metadata
3. .metadata()                                       → ParquetMetaData accessor
4. .next_column_chunk() repeatedly                   → first call issues the body GET
                                                       → returns ColumnChunkBytes in
                                                         storage order (RG-major,
                                                         col-major-in-RG)
                                                       → returns Ok(None) at EOF
                                                         (idempotent thereafter)

ColumnChunkBytes::bytes is exactly the on-disk column chunk's
compressed_size bytes, ref-counted and cheap to clone. The merge
engine in PR-6 will wrap whole-file bytes (received from this reader
as a sequence of Bytes) for SerializedPageReader::new — that
constructor uses file-absolute offsets, so the natural integration is
"reader yields bytes; merge engine assembles them and decodes pages
with existing parquet APIs."

Layering decision

quickwit-parquet-engine deliberately does NOT take a
quickwit-storage dependency. Storage pulls in cloud-vendor SDKs that
this crate has no business linking against, and the layering would
invert (storage is a higher concern than the parquet data model).

Instead the reader takes a minimal RemoteByteSource trait — three
methods: file_size, get_slice, get_slice_stream. The merge
actor in quickwit-indexing will add a thin adapter to
quickwit_storage::Storage in PR-6, ~10 lines.

Page indexes

Not loaded into the parsed ParquetMetaData in PR-4. Rationale: page
indexes (offset_index / column_index) live in the file body, not the
footer; loading them either requires extra GETs or requires the body
GET range to extend earlier than the first column chunk. PR-6's
direct page copy reads column_chunk.offset_index_offset() /
offset_index_length() from the column metadata and decodes the
offset index from the body bytes lazily when it needs page boundaries
— no need to materialize the full ParquetOffsetIndex structure up
front.

This is purely a perf decision; the metadata structure is unchanged
from ParquetRecordBatchReaderBuilder::try_new(...).metadata()
modulo the absent page indexes.

Footer prefetch

Configurable via StreamingReaderConfig::footer_prefetch_bytes,
defaulting to 256 KiB. If the configured prefetch is smaller than
the actual footer (rare; would require thousands of columns or
extreme page counts), the reader transparently issues one retry GET
sized to the parser-reported needed length. The PR-A "two-GETs"
contract still holds in the common path; the rare retry is opt-in
via configuration.

Tests (12)

PR-A (two-GETs):

  • test_footer_get_only_at_construction — only one slice GET, zero stream GETs
  • test_two_gets_for_full_consumption — exactly one slice + one stream after drain
  • test_body_get_starts_at_first_column_chunk_offset — body range = [first_col.offset .. last_col.end]

PR-B (metadata equivalence vs sync reader):

  • test_metadata_matches_sync_reader — schema, KV, sorting_columns, num_rows
  • (KV qh.* round-trip folded into above)

PR-C (bytes round-trip):

  • test_column_chunk_bytes_round_trip_single_rg — yielded bytes ≡ file slice at byte_range()
  • test_column_chunk_bytes_round_trip_multi_rg — same across multiple row groups
  • test_column_chunk_pages_decode_through_full_file — sanity-check that bytes feed SerializedPageReader cleanly

PR-D (order):

  • test_storage_order_advance — observed (rg, col) pairs lex-ordered, full coverage

Edge cases:

  • test_eof_idempotent
  • test_small_prefetch_retries_with_correct_size
  • test_truncated_file_returns_footer_too_large
  • test_out_of_order_yields_structured_error (defensive — public API can't normally hit it)

Position in stack

PR Status Description
PR-1 open (#6377) Page-level stats + qh.rg_partition_prefix_len marker
PR-2 open (#6384) Streaming column-major writer primitive
PR-4 this PR Streaming column-major reader primitive
PR-3 next Ingest writer cuts over to single-RG using PR-2
PR-5 Legacy multi-RG input adapter (uses same RemoteByteSource trait)
PR-6 Streaming merge engine; consumes PR-2 + PR-4
PR-7 Wire ParquetMergeExecutor; delete downloader

PR-4 is independent of PR-1 / PR-2 / PR-3 review — it branches off
main and shares no code with them.

Test plan

  • cargo +nightly fmt (per files touched)
  • RUSTFLAGS=\"-Dwarnings --cfg tokio_unstable\" cargo clippy -p quickwit-parquet-engine --tests
  • cargo doc -p quickwit-parquet-engine --no-deps
  • cargo machete
  • bash quickwit/scripts/check_license_headers.sh
  • bash quickwit/scripts/check_log_format.sh
  • cargo nextest run -p quickwit-parquet-engine --all-features — 368 tests, all pass

🤖 Generated with Claude Code

Issues exactly two streaming reads per input file in the common path:
one footer GET (via RemoteByteSource::get_slice) and one body GET
(via RemoteByteSource::get_slice_stream). Yields per-column-chunk
raw bytes in storage order (RG-major, col-major-in-RG) so callers
can decode pages with parquet's existing SerializedPageReader at
their own pace.

Symmetric with PR-2's StreamingParquetWriter. Together the two PRs
form the I/O substrate for PR-6's streaming column-major merge
engine and PR-7's wiring into ParquetMergeExecutor.

quickwit-parquet-engine deliberately does not depend on
quickwit-storage — that would invert the layering, since storage
pulls in cloud-vendor SDKs this crate has no business linking
against. Instead the reader takes a minimal RemoteByteSource trait
(file_size + get_slice + get_slice_stream); PR-6 will provide a
~10-line adapter from quickwit_storage::Storage in
quickwit-indexing.

Page indexes (offset_index/column_index) are NOT loaded into the
parsed ParquetMetaData. They live in the file body, not the
footer; loading them either requires extra GETs or extends the
body GET range to start before the first column chunk. PR-6's
direct page copy reads column_chunk.offset_index_offset/length and
decodes the index from the body bytes when it needs page
boundaries — no need to materialize the full ParquetOffsetIndex
structure up front.

12 tests cover the four contracts (PR-A two-GETs, PR-B metadata
equivalence vs sync reader, PR-C bytes round-trip on single-RG
and multi-RG files, PR-D storage-order advance), plus footer body
range correctness, KV metadata round-trip, EOF idempotency,
out-of-order error path, prefetch retry, and truncated-file
rejection.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant