Skip to content

[CELEBORN-2348] Support end-to-end shuffle integrity check for Flink#3718

Closed
SteNicholas wants to merge 2 commits into
apache:mainfrom
SteNicholas:CELEBORN-2348
Closed

[CELEBORN-2348] Support end-to-end shuffle integrity check for Flink#3718
SteNicholas wants to merge 2 commits into
apache:mainfrom
SteNicholas:CELEBORN-2348

Conversation

@SteNicholas
Copy link
Copy Markdown
Member

What changes were proposed in this pull request?

This PR extends the end-to-end shuffle integrity checks introduced in CELEBORN-894 (Spark-only) to Flink workloads, covering both the regular and the tiered (hybrid) read paths. When the check is enabled, the write side records a per subpartition CRC32 + byte count and the driver validates it against what the reader actually consumed, failing the read on a mismatch.

  • Write side: FlinkShuffleClientImpl hashes each push payload (the body after the batch header) into PushState via a zero-copy ByteBuffer view and reports the per-subpartition CRC32/bytes at MapperEnd, reusing the existing crc32PerPartition / bytesWrittenPerPartition plumbing. The constructor fails fast if the write-side BATCH_HEADER_SIZE ever diverges from the read-side BufferUtils.HEADER_LENGTH_PREFIX.
  • Read side: RemoteBufferStreamReader and CelebornChannelBufferReader accumulate the read CRC32/bytes through a shared ReadIntegrityTracker and report them at the last partition's stream end. The tracker owns the per-path framing/stripping and disables itself on any unexpected buffer shape (wrong component count, or a buffer shorter than the batch header) rather than risk a false mismatch.
  • Driver side: ReadReducerPartitionEnd is reused for MAP partitions, and MapPartitionCommitHandler.finishPartition combines the recorded write-side checksums over the consumed subpartition range, failing closed on a mismatch or missing metadata.
  • Add zero-copy ByteBuffer overloads to CelebornCRC32, CommitMetadata and PushState.
  • Minor: drop a stray (cosmetic, no-op) unary plus in handleReducerPartitionEnd's failure branch and update the client config doc.

Why are the changes needed?

CELEBORN-894 added end-to-end integrity verification only for Spark. Flink workloads — including hybrid/tiered shuffle — had no equivalent guard, so silent shuffle data corruption (bit flips, truncation, mis-framing) could go undetected and surface as wrong results rather than a failed task. This PR brings the same write-vs-read checksum/byte-count validation to Flink so such corruption fails the read instead of being silently consumed.

Does this PR resolve a correctness bug?

  • Yes

Does this PR introduce any user-facing change?

  • Yes

The existing celeborn.client.shuffle.integrityCheck.enabled config now also applies to Flink (previously Spark-only); its documentation is updated accordingly. The default remains false, so there is no behavior change unless the check is explicitly enabled.

How was this patch tested?

Added unit and integration tests:

  • CelebornCRC32Test / CommitMetadataTest: the new ByteBuffer overloads (single and split header/data), order-independence, and corruption / byte-count-mismatch detection.
  • MapPartitionCommitHandlerTest: finishPartition success and all failure branches (no metadata, missing map partition, out-of-bounds range, checksum mismatch, byte-count mismatch), concurrent recording, and the expired-shuffle race.
  • ReadIntegrityTrackerTest: report-once / disable semantics and per-path framing for both the regular and tiered read paths.
  • RemoteBufferStreamReaderTest: the stream-end-after-close race (a failed report must not notify the failure listener on a closed channel).
  • CelebornBufferStreamTest: the hasRemainingPartitions location-index boundary.
  • WordCountTest (WordCountTestWithIntegrityCheck) and HybridShuffleWordCountTest: end-to-end Flink runs with the check enabled, on both the regular and hybrid shuffle paths.

Extend the end-to-end integrity checks from CELEBORN-894 to Flink, on both
the regular and tiered (hybrid) read paths. When the check is enabled, the
write side records a per-subpartition CRC32/byte count and the driver
validates it against what the reader consumed, failing the read on a mismatch.

- Write side: FlinkShuffleClientImpl hashes each push payload into PushState
  and reports per-subpartition CRC32/bytes at MapperEnd.
- Read side: RemoteBufferStreamReader and CelebornChannelBufferReader
  accumulate the read CRC32/bytes via a shared ReadIntegrityTracker, reported
  at the last partition's stream end.
- Driver side: MapPartitionCommitHandler.finishPartition combines the recorded
  write-side checksums over the consumed range, failing closed on a mismatch.

Also add zero-copy ByteBuffer overloads to CelebornCRC32, CommitMetadata and
PushState, with unit and Flink integration tests.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@SteNicholas
Copy link
Copy Markdown
Member Author

SteNicholas commented Jun 3, 2026

Ping @gauravkm, @RexXiong, @pltbkd, @xumingming.

Copy link
Copy Markdown
Contributor

@xumingming xumingming left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good overall, some comments inline.

@pltbkd
Copy link
Copy Markdown
Contributor

pltbkd commented Jun 4, 2026

Overall LGTM. The only concern is that map partition reuses readReducerPartitionEnd, along with PbReadReducerPartitionEnd and READ_REDUCER_PARTITION_END_FAILED, which will probably be confusing in the future. I'd recommend renaming to the readPartitionEnd series, but there are possibly compatibility issues. WDYT?

@SteNicholas
Copy link
Copy Markdown
Member Author

@pltbkd thanks for the review! Agreed the reducer-centric naming reads oddly now that map partitions reuse it.

The good news is the rename would be wire-safe, because the protocol is keyed by numbers, not names:

  • MessageType.READ_REDUCER_PARTITION_END = 94 / _RESPONSE = 95 — protobuf encodes the message/enum by tag number, not by name, so renaming to a READ_PARTITION_END / PbReadPartitionEnd series while keeping 94/95 doesn't change the wire format.
  • StatusCode.READ_REDUCER_PARTITION_END_FAILED(56) — same story: the 56 is what goes on the wire, not the constant name.

And this RPC is client → LifecycleManager, which run at the same Celeborn version within a single application (unlike the worker↔master path), so there's no rolling-upgrade skew to worry about either. The source footprint is small as well (~5 files each for ReadReducerPartitionEnd / READ_REDUCER_PARTITION_END).

My only hesitation is mixing a protocol-wide rename (it also touches the existing Spark path) into this Flink feature PR. I'd lean toward doing the readPartitionEnd rename as a focused follow-up right after this merges, to keep this diff reviewable — but I'm happy to fold it in here if you'd prefer. WDYT?

…ition-type-agnostic

finishPartition is now reused for map partitions, so the trait doc no longer
applies only to reduce partitions. Reword it to cover both and point at
MapPartitionCommitHandler.finishPartition for the map-partition semantics.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@SteNicholas SteNicholas requested a review from xumingming June 4, 2026 08:59
Copy link
Copy Markdown
Contributor

@xumingming xumingming left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Copy Markdown
Contributor

@RexXiong RexXiong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@SteNicholas
Copy link
Copy Markdown
Member Author

Merged to main(v0.7.0).

@pltbkd
Copy link
Copy Markdown
Contributor

pltbkd commented Jun 5, 2026

@pltbkd thanks for the review! Agreed the reducer-centric naming reads oddly now that map partitions reuse it.

The good news is the rename would be wire-safe, because the protocol is keyed by numbers, not names:

  • MessageType.READ_REDUCER_PARTITION_END = 94 / _RESPONSE = 95 — protobuf encodes the message/enum by tag number, not by name, so renaming to a READ_PARTITION_END / PbReadPartitionEnd series while keeping 94/95 doesn't change the wire format.
  • StatusCode.READ_REDUCER_PARTITION_END_FAILED(56) — same story: the 56 is what goes on the wire, not the constant name.

And this RPC is client → LifecycleManager, which run at the same Celeborn version within a single application (unlike the worker↔master path), so there's no rolling-upgrade skew to worry about either. The source footprint is small as well (~5 files each for ReadReducerPartitionEnd / READ_REDUCER_PARTITION_END).

My only hesitation is mixing a protocol-wide rename (it also touches the existing Spark path) into this Flink feature PR. I'd lean toward doing the readPartitionEnd rename as a focused follow-up right after this merges, to keep this diff reviewable — but I'm happy to fold it in here if you'd prefer. WDYT?

Sorry for the late reply. Agreed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants