Skip to content

feat: Arrow-direct codegen dispatcher for Spark expressions and Scala UDFs#4267

Draft
mbutrovich wants to merge 22 commits intoapache:mainfrom
mbutrovich:codegen_scala_udf
Draft

feat: Arrow-direct codegen dispatcher for Spark expressions and Scala UDFs#4267
mbutrovich wants to merge 22 commits intoapache:mainfrom
mbutrovich:codegen_scala_udf

Conversation

@mbutrovich
Copy link
Copy Markdown
Contributor

@mbutrovich mbutrovich commented May 8, 2026

Draft while we discuss with #4233 and #4239.

Which issue does this PR close?

Closes #.

Rationale for this change

#4232 merged the JVM UDF bridge: a JNI path that lets native execution call CometUDF implementations on the JVM, with Arrow FFI for data exchange. One way to extend it is to write more CometUDFs per expression, as #4239 does for the remaining Spark regex family. Another way is to expose the bridge to end users so they can register their own, as #4233 does. Both paths require a hand-written Arrow-vector implementation per expression, and both require it for every arbitrary ScalaUDF or Catalyst expression that a user wants on the native path.

This PR proposes a different approach on top of #4232: a codegen-based dispatcher that compiles a batch-kernel CometUDF directly from a bound Catalyst Expression via Janino. Any expression whose children and output type are supported routes through native without a hand-written CometUDF. One dispatcher covers user ScalaUDFs, regex expressions, and any other Catalyst expression that Spark's codegen can already emit, with no per-expression glue.

Operating on bound Catalyst Expression trees affects three parts of the system:

  • Engineering: one dispatcher and one serde entry point covers N expressions. Adding a new Catalyst expression typically means relaxing CometBatchKernelCodegen.canHandle, not writing a new CometUDF class plus its serde.
  • Planning: a UDF subtree is not an opaque boundary. Ordinary Catalyst expressions, registered ScalaUDFs, and unregistered ScalaUDFs live in the same tree and are serialized together, so Comet keeps the surrounding native operators in place.
  • Execution: an entire expression subtree (for example f(g(a), h(b)) or upper(udf(x))) compiles into a single generated batch kernel with one per-row loop. Every expression in the subtree runs on row i before the loop advances to row i+1. Intermediate values are local variables on the stack rather than Arrow vectors sized to the full batch. The hand-coded CometUDF path cannot fuse across UDF boundaries because the evaluate(inputs: Array[ValueVector], numRows: Int): ValueVector signature requires each UDF to consume and return fully-materialized Arrow vectors, so stacking two hand-coded UDFs produces two JNI calls and one intermediate vector of numRows elements between them. The dispatcher avoids that because it sees the Catalyst subtree as source. Spark's CodegenContext and doGenCode inline each child's generated code into its parent's, so the whole subtree flattens into one straight-line sequence per row. This is the same machinery that powers WholeStageCodegen, so the per-row fusion is reused rather than written here.

The dispatcher is opt-in and gated by spark.comet.exec.codegenDispatch.mode = auto | force | disabled.

Intended scope is narrower than "any expression". The primary targets are string expressions, where JVM and Rust differ on collation and regex engine semantics, and custom ScalaUDFs, where no Rust implementation exists. For numeric and other expression families with native Rust kernels, the native path is almost certainly faster and this dispatcher is not meant to replace it.

What changes are included in this PR?

Codegen dispatcher

  • CometBatchKernelCodegen - compiles a bound Expression into a specialized CometUDF via Janino. Object Scaladoc covers caching, CSE variant choice, and the full optimization menu.
  • CometCodegenDispatchUDF - the bridge's CometUDF. Carries the expression as serialized bytes. Three-layer cache (JVM-wide compile, per-thread UDF instance, per-partition kernel instance) described in its Scaladoc.
  • CometInternalRow - Arrow-vector-backed InternalRow that Spark's BoundReference.genCode reads through.
  • CometArrayData - Arrow-vector-backed ArrayData shim that Spark's BoundReference.genCode uses for getArray(ord) calls. One codegen-emitted final subclass per array-typed input column, specialized on the element type.

Complex type support (ArrayType)

  • ArrowColumnSpec is a sealed trait with ScalarColumnSpec and ArrayColumnSpec(nullable, elementSparkType, element). The element is itself an ArrowColumnSpec, so nested shapes (Array<Array<...>>) fall out of the recursion. Map and Struct cases will plug into the same trait in follow-up work without disturbing callers.
  • Output writer: emitWrite emits a ListVector.startNewValue / element loop / endValue triple per row, with the per-element write recursing through emitWrite on the list's child vector. allocateOutput allocates the ListVector with its inner typed data vector, pre-sized from the input's data-buffer estimate.
  • Input reader: the kernel emits one InputArray_colN final class per array-typed input column, extending CometArrayData. Each class holds startIndex / length state reset per row from the outer ListVector's offsets; element reads go through the typed child-vector field with zero allocation (UTF8String.fromAddress for string elements, decimal128 short-precision fast path for DecimalType(p <= 18), primitive direct for others). The kernel's getArray(ord) switch resets the pre-allocated instance and returns it.

Optimizations applied in the generated kernel

Compile-time specialized per (expression, input schema) pair. The generated Java carries only the chosen path at each emission site. Full enumeration, triggers, and code anchors live in the object-level Scaladoc menu on CometBatchKernelCodegen. Categories:

  • Input readers (3): zero-copy UTF8 reads for VarCharVector / ViewVarCharVector, non-nullable isNullAt elision, decimal short-value fast path for p <= 18.
  • Output writers (3): decimal short-value fast path for p <= 18 (toUnscaledLong + DecimalVector.setSafe(int, long)), UTF8 on-heap shortcut (pass UTF8String's backing byte[] directly, skip the redundant getBytes() allocation), pre-sized output buffers derived from input data-buffer sizes.
  • Kernel shape (3): NullIntolerant short-circuit, non-nullable output short-circuit, subexpression elimination (class-field variant).
  • Per-expression specializers (1): RegExpReplace with direct-column subject and foldable pattern / replacement bypasses the UTF8String round-trip that java.util.regex.Matcher's CharSequence requirement would otherwise force.

Each optimization has a source-level activation assertion in CometCodegenSourceSuite. Smoke and fuzz tests cover correctness end-to-end.

Per-expression specialized path

When the default doGenCode output pays a measurable penalty because of conversions an Arrow-aware byte-oriented loop would skip, the dispatcher supports emitting custom Java for that expression while staying inside the framework (same cache, same schema-keying, same serde entry). RegExpReplace is the current example. The infrastructure is structured so future specializers can land alongside. See specializedRegExpReplaceBody for the emit and the criteria for adding new specializers.

Bridge contract

  • New numRows parameter on CometUDF.evaluate. Mirrors DataFusion's ScalarFunctionArgs.number_rows. Needed for zero-column expressions where no input vector carries batch size.
  • New TaskContext parameter. CometExecIterator captures the Spark task thread's TaskContext at createPlan time; it is stashed as a GlobalRef in the native ExecutionContext, threaded into each JvmScalarUdfExpr, and installed as the thread-local in the bridge in a try/finally so Tokio workers see a live TaskContext instead of null. Access to protected[spark] TaskContext.setTaskContext / unset goes through CometTaskContextShim in org.apache.spark.comet. Fixes correctness for partition-sensitive built-ins inside UDF trees (Rand, Uuid, MonotonicallyIncreasingID) and any user UDF that calls TaskContext.get().
  • JNI signature: (Ljava/lang/String;[J[JJJILorg/apache/spark/TaskContext;)V. Native call site updated.

Serde routing

  • scalaUdf.scala - routes any ScalaUDF through the codegen dispatcher, no registration step required.
  • strings.scala - CodegenDispatchSerdeHelpers.pickWithMode gives every regex-family expression (rlike, regexp_replace, regexp_extract, regexp_extract_all, regexp_instr, split via StringSplit) a uniform auto | force | disabled switch. regexp_replace, rlike, and StringSplit fall through to their existing native Rust paths when regexp.engine=rust; disabled mode returns None and Spark runs the expression.
  • CometConf.COMET_CODEGEN_DISPATCH_MODE - the mode knob.

Docs

  • docs/source/contributor-guide/jvm_udf_dispatch.md.
  • Object-level Scaladoc menu on CometBatchKernelCodegen listing every optimization with its trigger and code anchor. Single source of truth; the categories above are intentionally terse pointers.
  • In-code invariants and TODOs: subquery correctness on canHandle, cache-key hashing cost on CometCodegenDispatchUDF.CacheKey, 64KB method-size note on generateSource, WSCG-variant CSE discussion on the object Scaladoc, full zero-copy UTF8 output deferred with justification next to the StringType writer case.

How are these changes tested?

  • CometCodegenDispatchSmokeSuite - type-coverage with vector-signature assertions, composed-UDF tests (3-deep and multi-column), zero-column ScalaUDF, decimal precisions on each side of the p = 18 boundary, subquery-reuse test, three TaskContext-propagation tests (TaskContext.get().partitionId() via spark.range, same probe via a fully-native Parquet source, multi-partition rand(seed) composition), and ArrayType input / output end-to-end tests (Seq[String], Seq[Int], Seq[BigDecimal], and array-returning UDFs).
  • CometCodegenSourceSuite - generated-source assertions for every optimization in the menu: zero-copy UTF8 reads, non-nullable isNullAt elision, decimal input fast-path and slow-path emission, decimal output fast-path and slow-path emission, UTF8 output on-heap shortcut, NullIntolerant short-circuit, non-nullable output short-circuit and its nullable counterpart, CSE collapse with Length marker, CSE filter leaving Add(Rand, Rand) alone, specialized RegExpReplace emitter. Array coverage: ListVector.startNewValue / endValue emission for ArrayType output; InputArray_colN nested class with the right element-type getter for ArrayType(StringType) / ArrayType(IntegerType) / ArrayType(DecimalType) on both sides of the p = 18 boundary.
  • CometCodegenDispatchFuzzSuite - multi-column fuzz across the supported type matrix, plus decimal identity fuzz over the 18-digit boundary at several null densities.
  • CometRegExpJvmSuite - SQL-level Spark-vs-Comet correctness suite for the regex family. Passes unchanged with the dispatcher in auto and force.
  • CometScalaUDFCompositionBenchmark - four modes (Spark, Comet native built-ins, dispatcher disabled, dispatcher force) over three shapes. Numbers in the design doc.

@mbutrovich
Copy link
Copy Markdown
Contributor Author

There are like 4 Spark SQL test failures that look like they might need updating, but otherwise it's looking good. Not gonna worry about them until we discuss moving forward.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant