Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
a454360
feat: add optimized PyArrow UDF execution (CometPythonMapInArrowExec)
andygrove May 6, 2026
84aec84
docs: add PyArrow UDF acceleration user guide page
andygrove May 6, 2026
af98fbb
fix(test): correct PyArrow UDF integration test signatures and assert…
andygrove May 6, 2026
f29cb2f
test: convert PyArrow UDF script to pytest and add CI coverage
andygrove May 6, 2026
f751539
docs: run prettier on pyarrow-udfs user guide page
andygrove May 6, 2026
b14fbfb
style: apply spotless formatting
andygrove May 6, 2026
ca0bbbf
ci: broaden pyarrow_udf_test triggers to match pr_build_linux
andygrove May 6, 2026
55c28c3
ci: restrict GITHUB_TOKEN to contents:read in pyarrow_udf_test
andygrove May 6, 2026
05b1e7a
fix: shim CometPythonMapInArrowExec for cross-version Spark builds
andygrove May 6, 2026
66eb246
ci: switch pyarrow_udf_test container to rust:bookworm
andygrove May 6, 2026
ec6fa78
ci: set PYSPARK_PYTHON to venv python for pyarrow_udf_test
andygrove May 6, 2026
1de2c2f
feat: default-disable PyArrow UDF optimization while experimental
andygrove May 6, 2026
3f68cbe
test: expand PyArrow UDF pytest coverage
andygrove May 6, 2026
e2ca2d2
docs: document PyArrow UDF limitations and AQE explain quirk
andygrove May 6, 2026
f4b5c32
bench: add Python end-to-end benchmark for PyArrow UDF acceleration
andygrove May 6, 2026
3822ed7
fix: propagate isBarrier through CometPythonMapInArrowExec
andygrove May 6, 2026
24dc84b
refactor: address PR review feedback for PyArrow UDF acceleration
andygrove May 8, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/pr_build_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ jobs:
org.apache.comet.exec.CometGenerateExecSuite
org.apache.comet.exec.CometWindowExecSuite
org.apache.comet.exec.CometJoinSuite
org.apache.comet.exec.CometPythonMapInArrowSuite
org.apache.comet.CometNativeSuite
org.apache.comet.CometSparkSessionExtensionsSuite
org.apache.spark.CometPluginsSuite
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/pr_build_macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ jobs:
org.apache.comet.exec.CometGenerateExecSuite
org.apache.comet.exec.CometWindowExecSuite
org.apache.comet.exec.CometJoinSuite
org.apache.comet.exec.CometPythonMapInArrowSuite
org.apache.comet.CometNativeSuite
org.apache.comet.CometSparkSessionExtensionsSuite
org.apache.spark.CometPluginsSuite
Expand Down
112 changes: 112 additions & 0 deletions .github/workflows/pyarrow_udf_test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

name: PyArrow UDF Tests

concurrency:
group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{ github.workflow }}
cancel-in-progress: true

on:
push:
branches:
- main
paths-ignore:
- "benchmarks/**"
- "doc/**"
- "docs/**"
- "**.md"
- "dev/changelog/*.md"
- "native/core/benches/**"
- "native/spark-expr/benches/**"
- "spark/src/test/scala/org/apache/spark/sql/benchmark/**"
- "spark/src/main/scala/org/apache/comet/GenerateDocs.scala"
pull_request:
paths-ignore:
- "benchmarks/**"
- "doc/**"
- "docs/**"
- "**.md"
- "dev/changelog/*.md"
- "native/core/benches/**"
- "native/spark-expr/benches/**"
- "spark/src/test/scala/org/apache/spark/sql/benchmark/**"
- "spark/src/main/scala/org/apache/comet/GenerateDocs.scala"
workflow_dispatch:

permissions:
contents: read

env:
RUST_VERSION: stable
RUST_BACKTRACE: 1
RUSTFLAGS: "-Clink-arg=-fuse-ld=bfd"

jobs:
pyarrow-udf:
name: PyArrow UDF (Spark 4.0, JDK 17, Python 3.11)
runs-on: ubuntu-latest
container:
# Pinned to the Debian 12 (bookworm) base so the system `python3` is 3.11. The default
# `amd64/rust` image is Debian 13 (trixie) which ships Python 3.13 and no python3.11 apt
# package, breaking `apt-get install python3.11`.
image: rust:bookworm
env:
JAVA_TOOL_OPTIONS: "--add-exports=java.base/sun.nio.ch=ALL-UNNAMED --add-exports=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED"
steps:
- uses: actions/checkout@v6

- name: Setup Rust & Java toolchain
uses: ./.github/actions/setup-builder
with:
rust-version: ${{ env.RUST_VERSION }}
jdk-version: 17

- name: Cache Maven dependencies
uses: actions/cache@v5
with:
path: |
~/.m2/repository
/root/.m2/repository
key: ${{ runner.os }}-java-maven-${{ hashFiles('**/pom.xml') }}-pyarrow-udf
restore-keys: |
${{ runner.os }}-java-maven-

- name: Build Comet (debug, Spark 4.0 / Scala 2.13)
run: |
cd native && cargo build
cd .. && ./mvnw -B install -DskipTests -Pspark-4.0 -Pscala-2.13

- name: Install Python 3.11 and pip
run: |
apt-get update
apt-get install -y --no-install-recommends python3 python3-venv python3-pip
python3 -m venv /tmp/venv
/tmp/venv/bin/pip install --upgrade pip
/tmp/venv/bin/pip install "pyspark==4.0.1" "pyarrow>=14" pandas pytest

- name: Run PyArrow UDF pytest
env:
# Spark launches Python workers in a fresh subprocess and looks up `python3`
# on PATH unless PYSPARK_PYTHON is set. Without this, workers use the system
# python which has no pyarrow installed and UDF execution fails with
# ModuleNotFoundError.
PYSPARK_PYTHON: /tmp/venv/bin/python
PYSPARK_DRIVER_PYTHON: /tmp/venv/bin/python
run: |
/tmp/venv/bin/python -m pytest -v \
spark/src/test/resources/pyspark/test_pyarrow_udf.py
12 changes: 12 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,18 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_PYARROW_UDF_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.exec.pyarrowUdf.enabled")
.category(CATEGORY_EXEC)
.doc(
"Experimental: whether to enable optimized execution of PyArrow UDFs " +
"(mapInArrow/mapInPandas). When enabled, Comet passes Arrow columnar data " +
"directly to Python UDFs without the intermediate Arrow-to-Row-to-Arrow " +
"conversion that Spark normally performs. Disabled by default while the " +
"feature stabilizes.")
.booleanConf
.createWithDefault(false)

val COMET_TRACING_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.tracing.enabled")
.category(CATEGORY_TUNING)
.doc(s"Enable fine-grained tracing of events and memory usage. $TRACING_GUIDE.")
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/latest/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,6 @@ Comet $COMET_VERSION User Guide
Understanding Comet Plans <understanding-comet-plans>
Tuning Guide <tuning>
Metrics Guide <metrics>
PyArrow UDF Acceleration <pyarrow-udfs>
Iceberg Guide <iceberg>
Kubernetes Guide <kubernetes>
188 changes: 188 additions & 0 deletions docs/source/user-guide/latest/pyarrow-udfs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
<!---
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# PyArrow UDF Acceleration

Comet can accelerate Python UDFs that use PyArrow-backed batch processing, such as `mapInArrow` and `mapInPandas`.
These APIs are commonly used for ML inference, feature engineering, and data transformation workloads.

## Background

Spark's `mapInArrow` and `mapInPandas` APIs allow users to apply Python functions that operate on Arrow
RecordBatches or Pandas DataFrames. Under the hood, Spark communicates with the Python worker process
using the Arrow IPC format.

Without Comet, the execution path for these UDFs involves unnecessary data conversions:

1. Comet reads data in Arrow columnar format (via CometScan)
2. Spark inserts a ColumnarToRow transition (converts Arrow to UnsafeRow)
3. The Python runner converts those rows back to Arrow to send to Python
4. Python executes the UDF on Arrow batches
5. Results are returned as Arrow and then converted back to rows

Steps 2 and 3 are redundant since the data starts and ends in Arrow format.

## How Comet Optimizes This

When enabled, Comet detects `PythonMapInArrowExec` / `MapInArrowExec` and `MapInPandasExec`
operators in the physical plan and replaces them with `CometMapInBatchExec`, which:

- Reads Arrow columnar batches directly from the upstream Comet operator
- Feeds them to the Python runner without the expensive UnsafeProjection copy
- Keeps the Python output in columnar format for downstream operators

This eliminates the ColumnarToRow transition and the output row conversion, reducing CPU overhead
and memory allocations. The internal row-to-Arrow IPC re-encoding inside Spark's
`ArrowPythonRunner` is unchanged in this version; full round-trip elimination is tracked in
[#4240](https://github.com/apache/datafusion-comet/issues/4240).

### Plan flow

Without Comet's optimization:

```
PythonMapInArrow / MapInArrow / MapInPandas
+- ColumnarToRow <- Arrow -> Row copy
+- CometNativeExec <- Arrow batch
+- CometScan
```

With the optimization enabled:

```
CometMapInBatch <- Arrow batch in/out, Python runner attached
+- CometNativeExec
+- CometScan
```

## Configuration

The optimization is experimental and disabled by default. Enable it with:

```
spark.comet.exec.pyarrowUdf.enabled=true
```

The default is `false` while the feature stabilizes.

## Supported APIs

| PySpark API | Spark Plan Node | Supported |
| -------------------------------- | --------------------------- | --------- |
| `df.mapInArrow(func, schema)` | `PythonMapInArrowExec` | Yes |
| `df.mapInPandas(func, schema)` | `MapInPandasExec` | Yes |
| `@pandas_udf` (scalar) | `ArrowEvalPythonExec` | Not yet |
| `df.applyInPandas(func, schema)` | `FlatMapGroupsInPandasExec` | Not yet |

## Example

```python
import pyarrow as pa
from pyspark.sql import SparkSession, types as T

spark = SparkSession.builder \
.config("spark.plugins", "org.apache.spark.CometPlugin") \
.config("spark.comet.enabled", "true") \
.config("spark.comet.exec.enabled", "true") \
.config("spark.comet.exec.pyarrowUdf.enabled", "true") \
.config("spark.memory.offHeap.enabled", "true") \
.config("spark.memory.offHeap.size", "2g") \
.getOrCreate()

df = spark.read.parquet("data.parquet")

def transform(batch: pa.RecordBatch) -> pa.RecordBatch:
# Your transformation logic here
table = batch.to_pandas()
table["new_col"] = table["value"] * 2
return pa.RecordBatch.from_pandas(table)

output_schema = T.StructType([
T.StructField("value", T.DoubleType()),
T.StructField("new_col", T.DoubleType()),
])

result = df.mapInArrow(transform, output_schema)
```

## Verifying the Optimization

Use `explain()` to verify that `CometMapInBatch` appears in your plan:

```python
result.explain(mode="extended")
```

You should see:

```
CometMapInBatch ...
+- CometNativeExec ...
+- CometScan ...
```

Instead of the unoptimized plan:

```
PythonMapInArrow ...
+- ColumnarToRow
+- CometNativeExec ...
+- CometScan ...
```

When AQE is enabled (the Spark default) and the query contains a shuffle, the
optimization is applied during stage materialization. Calling `explain()` before
running an action will show the unoptimized plan:

```
AdaptiveSparkPlan isFinalPlan=false
+- PythonMapInArrow ...
+- CometExchange ...
```

To see the optimized plan, run an action first (for example `result.collect()` or
`result.cache(); result.count()`) and then call `explain()`. The post-execution
plan shows the materialized stages and includes `CometMapInBatch` if the
optimization fired.

## Barrier execution

`mapInArrow(..., barrier=True)` and `mapInPandas(..., barrier=True)` are honored: the
optimized operator propagates `isBarrier` through `RDD.barrier()`, so all tasks are
gang-scheduled and `BarrierTaskContext.barrier()` works inside the UDF the same way it does
on the unoptimized path.

## Limitations

- The optimization currently applies only to `mapInArrow` and `mapInPandas`. Scalar pandas UDFs
(`@pandas_udf`) and grouped operations (`applyInPandas`) are not yet supported.
- The internal row-to-Arrow conversion inside the Python runner is still present in this version.
Comet currently routes columnar input through `ColumnarBatch.rowIterator()` so that the existing
`ArrowPythonRunner` can re-encode the rows back to Arrow IPC. A future optimization will write
Arrow batches directly to the Python IPC stream, eliminating the remaining round-trip and
achieving near zero-copy data transfer.
- The optimization requires Arrow data on the input side. If a shuffle sits between the upstream
Comet operator and the Python UDF, you need Comet's native shuffle for the optimization to
apply. Set `spark.shuffle.manager` to
`org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager` and enable
`spark.comet.exec.shuffle.enabled=true` at session startup. With a vanilla Spark `Exchange`
in the plan the data leaves the shuffle as rows and the optimization cannot fire.
- Spark 3.4 lacks several APIs the optimization depends on (`MapInBatchExec.isBarrier`,
`arrowUseLargeVarTypes`, `JobArtifactSet`, the modern `ArrowPythonRunner` constructor). On
Spark 3.4 the feature is a no-op even when enabled. Spark 3.5+ is required.
Loading
Loading