Skip to content

[CELEBORN-2313] Extend E2E checked zone to batch assembly point#3716

Open
xumingming wants to merge 1 commit into
apache:mainfrom
xumingming:extend-e2e-checked-zone-v2
Open

[CELEBORN-2313] Extend E2E checked zone to batch assembly point#3716
xumingming wants to merge 1 commit into
apache:mainfrom
xumingming:extend-e2e-checked-zone-v2

Conversation

@xumingming
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Celeborn's E2E integrity check computes CRC_M inside ShuffleClientImpl.pushOrMergeData(), which runs in the async DataPusher thread. This leaves the segment from batch assembly in the writer thread through the DataPusher queue entirely outside the checked zone — meaning any corruption that occurs in that window is invisible to the integrity check and reaches reducers silently.

This change closes that gap and enables detection of a class of correctness bugs where data corruption occurs between batch assembly and async push dispatch, including bugs involving shared buffer pool references.

Introduce ShuffleClient.computeBatchCRC() and consolidate its invocation into two choke points:

  • DataPusher.addTask(): covers all async push paths. The CRC is recorded on the writer thread immediately before the buffer is enqueued, so DataPusher.pushData() intentionally uses the bare client.pushData() to avoid double-counting the same batch into CommitMetadata.

  • ShuffleClient.pushDataWithCRC() and ShuffleClient.mergeDataWithCRC(): new concrete convenience methods that call computeBatchCRC() then delegate to the abstract pushData()/mergeData(). These cover all synchronous push paths (pushGiantRecord, close() flush). The abstract pushData()/mergeData() are now documented as internal-use-only; all writer call sites across HashBasedShuffleWriter (spark-2/3), SortBasedShuffleWriter (spark-2/3), and SortBasedPusher use the WithCRC variants instead.

The now-redundant CRC computation inside pushOrMergeData() is removed.

This consolidation eliminates 7 scattered computeBatchCRC call sites that previously had to be manually paired with each push/merge call, reducing the risk of a future call site omitting the CRC step.

Why are the changes needed?

Enhance E2E Integrity Check, so it can cover more code path.

Does this PR resolve a correctness bug?

  • Yes

Does this PR introduce any user-facing change?

  • Yes

How was this patch tested?

Unit Test.

@xumingming
Copy link
Copy Markdown
Contributor Author

@gauravkm @RexXiong @SteNicholas Can you take a look at this one? This is a further polished version of #3670

The main change from the previous one is now computeBatchCRC is not scattered everywhere, it is embedded in pushDataWithCRC/mergeDataWithCRC/addTask. The main idea is from the review comments of previous PR, changed a little bit.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR extends Celeborn’s end-to-end shuffle integrity “checked zone” by moving CRC accumulation to the writer side (batch assembly / enqueue time) and centralizing CRC recording behind new *WithCRC convenience APIs. This closes the previously unchecked window between writer batch assembly and the async DataPusher dispatch queue.

Changes:

  • Add ShuffleClient.computeBatchCRC() plus pushDataWithCRC() / mergeDataWithCRC() wrappers to make CRC recording a consistent choke point for synchronous push/merge paths.
  • Record CRC in DataPusher.addTask() (writer thread) and remove the now-redundant CRC accumulation from ShuffleClientImpl.pushOrMergeData().
  • Update Spark shuffle writers/pushers to use the new *WithCRC APIs and add unit/integration-style tests validating CRC accumulation behavior.

Reviewed changes

Copilot reviewed 11 out of 11 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java Adds unit tests covering computeBatchCRC accumulation, disabled behavior, and attempt isolation.
client/src/main/java/org/apache/celeborn/client/write/DataPusher.java Records CRC on the writer thread before enqueue; avoids double-counting in async push thread.
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java Implements computeBatchCRC and removes CRC accumulation from pushOrMergeData().
client/src/main/java/org/apache/celeborn/client/ShuffleClient.java Introduces computeBatchCRC and pushDataWithCRC/mergeDataWithCRC convenience APIs + clarifies internal-use methods.
client/src/main/java/org/apache/celeborn/client/DummyShuffleClient.java Adds CRC tracking/state for tests and exposes per-partition captured batches.
client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/CelebornShuffleWriterSuiteBase.java Adds tests validating CRC/bytes accumulation matches pushed batches across mixed paths.
client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java Uses pushDataWithCRC for giant-record synchronous pushes.
client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java Uses pushDataWithCRC / mergeDataWithCRC for synchronous paths.
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java Uses pushDataWithCRC for giant-record synchronous pushes.
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java Uses pushDataWithCRC / mergeDataWithCRC for synchronous paths.
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java Uses mergeDataWithCRC for the merge path.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Celeborn's E2E integrity check computes CRC_M inside
`ShuffleClientImpl.pushOrMergeData()`, which runs in the async `DataPusher`
thread. This leaves the segment from batch assembly in the writer thread
through the `DataPusher` queue entirely outside the checked zone — meaning
any corruption that occurs in that window is invisible to the integrity check
and reaches reducers silently.

This change closes that gap and enables detection of a class of correctness
bugs where data corruption occurs between batch assembly and async push
dispatch, including bugs involving shared buffer pool references.

Introduce `ShuffleClient.computeBatchCRC()` and consolidate its invocation
into two choke points:

- `DataPusher.addTask()`: covers all async push paths. The CRC is recorded
  on the writer thread immediately before the buffer is enqueued, so
  `DataPusher.pushData()` intentionally uses the bare `client.pushData()`
  to avoid double-counting the same batch into CommitMetadata.

- `ShuffleClient.pushDataWithCRC()` and `ShuffleClient.mergeDataWithCRC()`:
  new concrete convenience methods that call `computeBatchCRC()` then
  delegate to the abstract `pushData()`/`mergeData()`. These cover all
  synchronous push paths (`pushGiantRecord`, `close()` flush). The abstract
  `pushData()`/`mergeData()` are now documented as internal-use-only;
  all writer call sites across `HashBasedShuffleWriter` (spark-2/3),
  `SortBasedShuffleWriter` (spark-2/3), and `SortBasedPusher` use the
  `WithCRC` variants instead.

The now-redundant CRC computation inside `pushOrMergeData()` is removed.

This consolidation eliminates 7 scattered `computeBatchCRC` call sites that
previously had to be manually paired with each push/merge call, reducing the
risk of a future call site omitting the CRC step.
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

Comment thread client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

Copy link
Copy Markdown
Member

@SteNicholas SteNicholas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for extending the checked zone up to the batch-assembly point — the direction is right and the writer-thread coverage gap is real. I reviewed the diff and cross-checked the call sites against main. The refactor looks solid; two correctness edges I'd like to confirm before merge, plus a few minor points.

Correctness — worth confirming

1. The mapperEnded check is no longer atomic with the push decision.
Previously the accumulation sat after the mapperEnded guard inside pushOrMergeData (guard at ShuffleClientImpl.java:1021, accumulation just below), so a batch was counted iff it was about to be pushed — atomically on the DataPusher thread. Now computeBatchCRC counts on the writer thread at addTask time, while the actual push re-checks mapperEnded later on the DataPusher thread and, on hit, calls pushState.cleanup() and returns 0.

So there's a window where mapperEnded is false at addTask (batch counted) but true at push (batch dropped + accumulated CRC wiped by cleanup()) — a write-vs-read divergence in either direction. In the normal single-attempt flow this can't happen (MapperEnd is only sent after the queue drains), but under speculation / revive, where another attempt ends the same (shuffleId, mapId), it looks reachable. Could you confirm this can't surface a false integrity failure (e.g., the losing attempt's metadata is never validated)?

2. Accumulation now races PushState.cleanup() across threads.
Before, all CRC accumulation happened on the DataPusher thread inside pushOrMergeData, single-threaded w.r.t. pushState.cleanup(). Now computeBatchCRC mutates PushState from the writer thread while the DataPusher thread can call pushState.cleanup() on the same instance. Individual adds are atomic, but add racing cleanup (clear) can lose an update. Is PushState's lifecycle safe under this new cross-thread access?

Completeness — looks good

Every shuffleClient.pushData(/shuffleClient.mergeData( writer call site is converted to a *WithCRC variant, and DataPusher keeps the single intentional bare pushData (CRC already recorded in addTask). CRC is still taken over uncompressed data (before compression in pushData), so no regression there.

Minor

  • DummyShuffleClient.getPushState changes from returning a fresh PushState each call to a cached one. Needed for the new tests, but it's a shared test double — worth a quick check that no existing test relied on the old "always fresh" behavior.
  • Javadoc: pushDataWithCRC says "Prefer this over pushData at all writer call sites," but DataPusher deliberately uses bare pushData. A one-line cross-reference would remove the apparent contradiction.
  • mergeData's "internal use only / async push pipeline" note is copied from pushData, but mergeData isn't driven by the async DataPusher — all its callers should use mergeDataWithCRC.

Tests

Good coverage (accumulation, disabled no-op, attempt-id isolation, giant+normal integration asserting crc-batch-count == push-call-count). The one gap maps to point #1: no test exercises mapperEnded flipping between addTask and the actual push — a targeted unit test there would de-risk the merge.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 11 out of 11 changed files in this pull request and generated no new comments.

Comments suppressed due to low confidence (2)

client/src/main/java/org/apache/celeborn/client/write/DataPusher.java:166

  • computeBatchCRC is executed before the task is successfully enqueued. If addTask exits early (e.g., checkException() throws while waiting for an idle PushTask, or an InterruptedException occurs), the batch will have been added to PushState commit metadata even though it never entered the async push pipeline, which can later trigger CommitMetadata mismatches when mapperEnd sends expected CRC/bytes.
  public void addTask(int partitionId, byte[] buffer, int size)
      throws IOException, InterruptedException {
    client.computeBatchCRC(shuffleId, mapId, attemptId, partitionId, buffer, 0, size);
    try {
      PushTask task = null;
      while (task == null) {
        checkException();
        task = idleQueue.poll(WAIT_TIME_NANOS, TimeUnit.NANOSECONDS);
      }
      task.setSize(size);
      task.setPartitionId(partitionId);
      System.arraycopy(buffer, 0, task.getBuffer(), 0, size);
      while (!dataPushQueue.addPushTask(task)) {

client/src/main/java/org/apache/celeborn/client/ShuffleClient.java:205

  • pushOrMergeData() no longer accumulates CommitMetadata when integrity check is enabled, so any call sites that still invoke pushData()/mergeData() directly (i.e. without going through DataPusher.addTask() or the new *WithCRC() helpers) will now send zero CRC/bytes at mapperEnd and can fail reducers with "CommitMetadata mismatch". There are still direct call sites in the repo (e.g. client-mr/mr/...CelebornSortBasedPusher.java uses pushData, and client-tez/tez/...CelebornTezWriter.java uses mergeData), so this change is a functional regression for those integrations when integrity check is enabled.
  /**
   * Write data to a specific reduce partition.
   *
   * <p><b>Internal use only.</b> Callers outside the async push pipeline (i.e. {@link
   * org.apache.celeborn.client.write.DataPusher}) should use {@link #pushDataWithCRC} instead,
   * which additionally records a CRC over the batch for end-to-end integrity checking.
   *
   * @param shuffleId the unique shuffle id of the application
   * @param mapId the map id of the shuffle
   * @param attemptId the attempt id of the map task, i.e. speculative task or task rerun for Apache

@xumingming
Copy link
Copy Markdown
Contributor Author

@SteNicholas Thanks for the review, the following is my response to each of the question you raised:

  1. The mapperEnded check is no longer atomic with the push decision.

The race between computeBatchCRC (writer thread) and pushOrMergeData (DataPusher thread) cannot cause a false integrity failure because:

  1. All three operations — computeBatchCRC, pushOrMergeData, and mapperEnd — are scoped to the same (shuffleId, mapId, attemptId) via PushState.
  2. If mapperEnded flips to true between CRC accumulation and the actual push, the current attempt is the losing attempt (another attempt won).
  3. The losing attempt never sends MapperEnd, so whatever CRC it accumulated (complete or partial) is never transmitted to the server and never validated.
  4. Only the winning attempt's MapperEnd — carrying its own full, correct CRC — is validated.
  1. Accumulation now races PushState.cleanup() across threads.

addTask modifies PushState.commitMetadataMap and PushState.cleanup() modifies PushState.inFlightRequestTracker, there is actually no races. Even if we clear PushState.commitMetadataMap in PushState.cleanup() in the future, it does not matter because when cleanup is called it means what ever accumulated in PushState is not needed anymore.

DummyShuffleClient.getPushState changes from returning a fresh PushState each call to a cached one.

DummyShuffleClient is used for tests, right? So as long as all tests pass, there will be no issue, right?

Javadoc: pushDataWithCRC says "Prefer this over pushData at all writer call sites," but DataPusher deliberately uses bare pushData. A one-line cross-reference would remove the apparent contradiction.

I have commented why we need to call pushData inside DataPusher, do we need more comment description?

mergeData's "internal use only / async push pipeline" note is copied from pushData, but mergeData isn't driven by the async DataPusher — all its callers should use mergeDataWithCRC.

Yes, you are right about this. The basic idea is that mergeData/pushData should only be used internally, do you want me to further optimize the comment?

The one gap maps to point #1: no test exercises mapperEnded flipping between addTask and the actual push — a targeted unit test there would de-risk the merge.

From the reasoning of the first answer, we can see that the MapperEnd event will never be sent for the losing mapper attempt, so it is guaranteed by current logic flow.

@xumingming
Copy link
Copy Markdown
Contributor Author

@SteNicholas Do you have further comments?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants