Skip to content

[CELEBORN-2329][CIP22] Encryption at Rest Spark Impl#3689

Open
akpatnam25 wants to merge 3 commits into
apache:mainfrom
akpatnam25:ear-spark-impl
Open

[CELEBORN-2329][CIP22] Encryption at Rest Spark Impl#3689
akpatnam25 wants to merge 3 commits into
apache:mainfrom
akpatnam25:ear-spark-impl

Conversation

@akpatnam25
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Adds EAR support for Spark side.
See more details in doc.

Why are the changes needed?

See more details in doc.

Does this PR resolve a correctness bug?

no

Does this PR introduce any user-facing change?

no

How was this patch tested?

unit tests and tested in production internally.

@akpatnam25 akpatnam25 marked this pull request as ready for review May 15, 2026 21:08
@akpatnam25
Copy link
Copy Markdown
Contributor Author

+CC @mridulm @rmcyang @FMX @SteNicholas PTAL, thanks.

@codecov
Copy link
Copy Markdown

codecov Bot commented May 15, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 66.98%. Comparing base (b4cb5a0) to head (0cf8e57).
⚠️ Report is 54 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3689      +/-   ##
==========================================
+ Coverage   66.91%   66.98%   +0.08%     
==========================================
  Files         358      359       +1     
  Lines       21986    22218     +232     
  Branches     1946     1968      +22     
==========================================
+ Hits        14710    14881     +171     
- Misses       6262     6315      +53     
- Partials     1014     1022       +8     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds Spark-side “encryption at rest” support by introducing a pluggable crypto interface in the Celeborn client, wiring encryption into push paths and decryption into fetch/read paths, and providing a Spark-backed implementation that uses Spark’s IO encryption settings/key.

Changes:

  • Introduces CryptoHandler and threads an optional handler through ShuffleClient/ShuffleClientImpl into push (encrypt) and read (decrypt) code paths.
  • Adds Spark implementation (SparkCryptoHandler) + plumbing (SparkCommonUtils.getCryptoHandler, Spark shuffle readers/managers updated to pass the handler).
  • Updates tests and shaded artifacts/dependencies to include the needed crypto runtime.

Reviewed changes

Copilot reviewed 20 out of 20 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
client/src/test/java/org/apache/celeborn/client/read/CelebornInputStreamPeerFailoverTest.java Updates test calls to new CelebornInputStream.create signature with optional crypto handler.
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java Encrypts outgoing shuffle payloads when crypto is enabled; passes handler into partition reads.
client/src/main/java/org/apache/celeborn/client/ShuffleClient.java Adds ShuffleClient.get(...) overload accepting an optional crypto handler; exposes setup hook.
client/src/main/java/org/apache/celeborn/client/security/CryptoHandler.java New interface for encrypt/decrypt on byte arrays.
client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java Decrypts incoming batches before optional decompression and consumption.
client/src/main/java/org/apache/celeborn/client/DummyShuffleClient.java Adds no-op crypto handler setup for dummy client.
client/pom.xml Adds commons-crypto dependency to client module.
client-spark/spark-3/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReaderSuite.scala Updates static mock to match new ShuffleClient.get signature.
client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala Extends constructors/plumbing to carry optional crypto handler to ShuffleClient.
client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java Updates reflective columnar shuffle reader construction to pass optional crypto handler.
client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java Passes Spark-derived crypto handler into writers/readers.
client-spark/spark-3-shaded/pom.xml Ensures commons-crypto is included in shaded Spark 3 artifact.
client-spark/spark-3-columnar-shuffle/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornColumnarShuffleReaderSuite.scala Updates tests to pass Optional.empty() crypto handler.
client-spark/spark-3-columnar-shuffle/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornColumnarShuffleReader.scala Threads optional crypto handler through columnar reader.
client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala Threads optional crypto handler through Spark 2 reader.
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java Passes Spark-derived crypto handler into Spark 2 writers/readers.
client-spark/spark-2-shaded/pom.xml Ensures commons-crypto is included in shaded Spark 2 artifact.
client-spark/common/src/test/java/org/apache/spark/shuffle/celeborn/SparkCryptoHandlerSuiteJ.java Adds unit tests for Spark crypto handler behavior.
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SparkCryptoHandler.java Implements CryptoHandler using Spark CryptoStreamUtils.
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SparkCommonUtils.java Derives optional crypto handler from Spark config/env IO encryption key.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@RexXiong
Copy link
Copy Markdown
Contributor

RexXiong commented May 18, 2026

PR Review: [CELEBORN-2329][CIP22] Encryption at Rest Spark Impl

Overall: Good architecture, some performance and safety concerns.

The encrypt-at-client / decrypt-at-client approach is clean — workers store ciphertext, no key distribution needed to workers. Compress-then-encrypt ordering is correct.


Performance Concerns

1. Heavy allocation on hot path — every batch allocates multiple objects

SparkCryptoHandler.encrypt() and decrypt() each create:

  • ByteArrayOutputStream/InputStream
  • DataOutputStream/InputStream
  • CryptoStream (which internally allocates IV, cipher state)
  • Output byte[] array

For a typical shuffle with thousands of batches per second per executor, this generates significant GC pressure. Consider:

  • Reusing crypto streams via ThreadLocal (reset with new IV per call)
  • Using a buffer pool for the output arrays
  • Or at minimum, pre-sizing ByteArrayOutputStream with length + overhead to avoid internal resizing

Moderate Issues

2. commons-crypto dependency added to generic client module

Non-Spark engines (Flink, MR) using the Celeborn client now pull in commons-crypto even if they never use encryption. Consider making this dependency optional in the client pom and keeping it non-optional only in the spark-shaded modules.

3. getCryptoHandler called multiple times per reader/writer creation

SparkCommonUtils.getCryptoHandler(conf)  // called in getWriter
SparkCommonUtils.getCryptoHandler(conf)  // called in getReader (multiple times for columnar path)

Each call accesses SparkEnv.get() and securityManager().getIOEncryptionKey(). While cheap, it creates a new SparkCryptoHandler instance each time. Since the key doesn't change within an app, this should be cached (e.g., at the SparkShuffleManager level).


Minor / Nits

4. No AES-GCM / integrity protection

Spark's CryptoStreamUtils uses AES-CTR by default which provides confidentiality but not integrity. A compromised worker could tamper with ciphertext without detection. This is a pre-existing Spark limitation (not introduced by this PR) but worth documenting in the CIP as a known gap.

5. Test coverage is good — roundtrip, wrong key, large data, empty data, offset. Could add a test for corrupted ciphertext (flip a byte in encrypted output, verify decrypt fails or produces wrong output).

6. Debug logging on every batch

logger.debug("Encrypted shuffle data for shuffle {} ...", shuffleId, ...);
logger.debug("Decrypted shuffle data for shuffle {} ...", shuffleId, ...);

Even with debug disabled, the varargs boxing and string formatting preparation happens. On a hot path with millions of batches, this adds up. Consider guarding with if (logger.isDebugEnabled()).


Reviewed with Claude Code

@akpatnam25
Copy link
Copy Markdown
Contributor Author

I will address the above comments in 2 weeks, I will be on vacation.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 20 out of 20 changed files in this pull request and generated 3 comments.

partitionId,
size,
decrypted.length);
size = decrypted.length;
Comment on lines +53 to +67
public byte[] decrypt(byte[] input, int offset, int length) throws IOException {
ByteArrayInputStream bais = new ByteArrayInputStream(input, offset, length);
DataInputStream dis = new DataInputStream(bais);
int decryptedLength = dis.readInt();
if (decryptedLength < 0) {
throw new IOException(
"Invalid decrypted length: " + decryptedLength + ", encrypted length: " + length);
}
try (DataInputStream cis =
new DataInputStream(CryptoStreamUtils.createCryptoInputStream(dis, sparkConf, key))) {
byte[] decrypted = new byte[decryptedLength];
cis.readFully(decrypted);
return decrypted;
}
}
Comment on lines +84 to +92
public void testEncryptWithOffset() throws IOException {
byte[] actual = "offset test data".getBytes();
byte[] padded = Arrays.copyOf(actual, actual.length + 20);

byte[] encrypted = handler.encrypt(padded, 0, actual.length);
byte[] decrypted = handler.decrypt(encrypted, 0, encrypted.length);

assertArrayEquals(actual, decrypted);
}
@SteNicholas
Copy link
Copy Markdown
Member

Review — Encryption at Rest (Spark)

Nice work. The wiring is clean and the integration points are well chosen. I traced the full write→store→read path and the design is sound. A few notes below, mostly non-blocking suggestions and a couple of questions.

What works well

  • Crypto ordering is correct: compress-then-encrypt on push (ShuffleClientImpl.pushOrMergeData) and decrypt-then-decompress on read (CelebornInputStream.fillBuffer). Encrypting after compression preserves compressibility, and the read path is symmetric.
  • Composes correctly with the end-to-end integrity check: the write-side checksum is computed over the raw uncompressed/unencrypted data (ShuffleClientImpl.java:1049-1051) and the read-side over the decrypted+decompressed data (CelebornInputStream.java:870-871). Both sides checksum the same plaintext, so EAR + celeborn.client.shuffle.integrityCheck.enabled work together.
  • Only the batch payload is encrypted — the 16-byte batch header (mapId/attemptId/batchId/length) stays plaintext, so worker-side routing/sorting and dedup are unaffected.
  • Backward-compatible surface: the 6-arg ShuffleClient.get overload is preserved (MR/Tez/Flink callers unaffected); signature changes are confined to internal create()/reader constructors with Optional.empty() defaults. Confirmed the only ShuffleClient concrete subclasses are ShuffleClientImpl and DummyShuffleClient, both updated.
  • Fresh IV per batch via CryptoStreamUtils → semantic security across batches.

Suggestions / questions

  1. No end-to-end test of the actual wiring (test coverage). SparkCryptoHandlerSuiteJ covers the handler in isolation, but every reader/writer test still passes Optional.empty(). The encrypt-on-push → decrypt-on-read path — and crucially its interaction with compression and the integrity check — has no automated coverage and relies on "tested in production internally." A round-trip test through ShuffleClientImpl/CelebornInputStream with a real handler (compression on/off × integrity-check on/off) would protect this from regressions.

  2. Per-batch allocation churn on the hot path (perf). In fillBuffer, the crypto branch reassigns compressedBuf = decrypted / rawDataBuf = decrypted to the fresh array returned by decrypt() on every batch, discarding the buffers pre-allocated in init(). Combined with decrypt()/encrypt() each allocating a ByteArray{Input,Output}Stream + a new array per call, this adds steady allocation pressure per batch. Consider a decrypt/encrypt API that writes into a reusable caller-provided buffer.

  3. PR description says "no user-facing change" — this isn't quite accurate. Users who already run with spark.io.encryption.enabled=true will now have their Celeborn shuffle data encrypted (added CPU + ~IV/length overhead per batch) where it previously was not. Worth calling out explicitly, and a docs/config page for the feature would help (the design is in a Google Doc that's not accessible to ASF reviewers — could it be folded into the CIP-22 doc?).

  4. Confidentiality vs. integrity (informational). Spark's CryptoStreamUtils default is AES/CTR, which is unauthenticated — EAR provides confidentiality but not tamper-detection on its own, and the integrity check that would catch tampering defaults to false. This matches Spark's own IO encryption, so it's a reasonable choice, but a doc note (confidentiality ≠ integrity; pair with the integrity check where tamper-resistance matters) would set expectations. The intentionally-loose assertion in testDecryptWithWrongKeyFails reflects exactly this property.

  5. commons-crypto dependency placement (question). It's added as a compile dependency to client/pom.xml, but the client module itself doesn't reference it — only client-spark/common's SparkCryptoHandler uses CryptoStreamUtils (which pulls commons-crypto transitively from spark-core). Is the explicit client dependency there to feed the shaded <include>s? If so, would declaring it in client-spark/common (where it's actually used) be cleaner?

  6. ShuffleClientImpl.cryptoHandler is non-volatile (minor). It's set in setupCryptoHandler (under the get() synchronized block) but read on task threads in pushOrMergeData/readPartition without synchronization. This mirrors the existing extension field, so it's consistent — just flagging volatile as a clarity improvement since reads happen post-init outside the lock.

  7. SparkCryptoHandler.encrypt writes the plaintext length as a cleartext int prefix (minor). It's redundant (you could read to EOF on decrypt) and leaks the exact plaintext size in the clear, though the ciphertext length already approximates it. Fine to keep — just noting.

Overall this looks solid and well-scoped. The main thing I'd push on before merge is item (1) — an integration test for the round-trip wiring.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 20 out of 20 changed files in this pull request and generated 5 comments.

Comment on lines 296 to +300
TaskContext context,
CelebornConf conf,
ShuffleReadMetricsReporter metrics,
ExecutorShuffleIdTracker shuffleIdTracker) {
ExecutorShuffleIdTracker shuffleIdTracker,
Optional<CryptoHandler> cryptoHandler) {
Comment on lines +2112 to +2116
public void setupCryptoHandler(Optional<CryptoHandler> cryptoHandler) {
this.cryptoHandler = cryptoHandler;
if (cryptoHandler.isPresent()) {
logger.info("IO encryption enabled for shuffle data (encryption at rest).");
}
Comment on lines +56 to +60
int decryptedLength = dis.readInt();
if (decryptedLength < 0) {
throw new IOException(
"Invalid decrypted length: " + decryptedLength + ", encrypted length: " + length);
}
Comment on lines +85 to +88
byte[] actual = "offset test data".getBytes();
byte[] padded = Arrays.copyOf(actual, actual.length + 20);

byte[] encrypted = handler.encrypt(padded, 0, actual.length);
Comment on lines 282 to +286
TaskContext.class,
CelebornConf.class,
ShuffleReadMetricsReporter.class,
ExecutorShuffleIdTracker.class)
ExecutorShuffleIdTracker.class,
Optional.class)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants