Skip to content

feat: add JVM UDF framework for native execution#4232

Open
andygrove wants to merge 2 commits intoapache:mainfrom
andygrove:jvm-udf-framework
Open

feat: add JVM UDF framework for native execution#4232
andygrove wants to merge 2 commits intoapache:mainfrom
andygrove:jvm-udf-framework

Conversation

@andygrove
Copy link
Copy Markdown
Member

@andygrove andygrove commented May 5, 2026

Which issue does this PR close?

Part of #4193

Rationale for this change

This PR adds the core JVM UDF framework that enables Comet to invoke JVM-side UDF implementations operating on Arrow data via JNI. This allows us to quickly implement expressions with 100% Spark compatibility without re-implementing them in native Rust code — we call existing Java/Spark code, but operate on Arrow data, avoiding an expensive transition falling back to Spark.

What changes are included in this PR?

The framework consists of:

JVM side:

  • CometUDF trait — interface that JVM UDF implementations must satisfy
  • CometUdfBridge — JNI entry point that native execution calls to invoke a UDF; handles class instantiation caching, Arrow FFI import/export, and result validation
  • CometLambdaRegistry — thread-safe registry bridging plan-time Spark expressions to execution-time UDF lookup

Native (Rust) side:

  • JvmScalarUdfExpr — DataFusion PhysicalExpr that delegates evaluation to a JVM-side CometUDF via JNI and the Arrow C Data Interface
  • CometUdfBridge JNI handle in jni-bridge — caches class/method references
  • JvmScalarUdf protobuf message — serde format for transmitting UDF invocations from plan to execution

Planner integration:

  • ExprStruct::JvmScalarUdf handling in the native planner

This is the framework only — individual expression implementations (e.g., array_exists) will be added in follow-up PRs.

How are these changes tested?

  • Rust compilation verified (cargo check passes for all affected crates)
  • End-to-end testing will come with the first expression implementation in a follow-up PR

Add a framework that allows Comet to invoke JVM-side UDF implementations
operating on Arrow data via JNI, avoiding expensive fallback to Spark while
maintaining 100% Spark compatibility for expressions not yet implemented
natively in Rust.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw @andygrove can we use this framework for regexp udfs?

@andygrove
Copy link
Copy Markdown
Member Author

Btw @andygrove can we use this framework for regexp udfs?

Yes, there is example in #4170

It is perfect for regexp because we get 100% compatibility with almost no effoert, enabled by default

@comphead
Copy link
Copy Markdown
Contributor

comphead commented May 5, 2026

I'm also wondering can we use this framework for user udfs 🤔 currently this is a huge drawback in Comet that for user defined function we fallback as there is no way to transpile custom user code to native side, can this framework be offered to the user as an alternative. depending on UDF complexity it may or may not be easy to rewrite custom user code from Spark UDF to Comet Java UDF. For example I anticipate some problems if the user works on the row level, i.e update some specific values in the row and in Arrow Java it might be more complicated but still promising

@andygrove
Copy link
Copy Markdown
Member Author

I'm also wondering can we use this framework for user udfs 🤔 currently this is a huge drawback in Comet that for user defined function we fallback as there is no way to transpile custom user code to native side, can this framework be offered to the user as an alternative. depending on UDF complexity it may or may not be easy to rewrite custom user code from Spark UDF to Comet Java UDF. For example I anticipate some problems if the user works on the row level, i.e update some specific values in the row and in Arrow Java it might be more complicated but still promising

I am already working on enable this in #4233

* time the serde layer registers a lambda expression under a unique key; at execution time the
* UDF retrieves it by that key (passed as a scalar argument).
*/
object CometLambdaRegistry {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is CometLambdaRegistry used?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// explicit per-task isolation.
private static final int CACHE_CAPACITY = 64;

private static final ThreadLocal<LinkedHashMap<String, CometUDF>> INSTANCES =
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we ensure one instance per thread? Spark/Hive UDFs don't seem to do this.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that is a good point, will take another look

@mbutrovich
Copy link
Copy Markdown
Contributor

This PR got me thinking about whether the per-expression CometUDF pattern could be generalized, and I prototyped a generic dispatcher on top of this PR's framework. Branch: https://github.com/mbutrovich/datafusion-comet/tree/jvm-udf-generic-dispatcher. The core file is CometGenericExpressionUDF.scala.

What it does

CometGenericExpressionUDF is one CometUDF class that evaluates an arbitrary Spark Expression tree. At plan time, the serde registers the bound expression in CometLambdaRegistry (UUID-keyed), emits a JvmScalarUdf proto pointing at the generic class, and passes data attributes as args. At execution time the UDF looks up the expression, compiles it once via GenerateMutableProjection, and loops over the input Arrow vectors using a reused SpecificInternalRow.

Benefits

One JVM-side class handles any scalar Spark Expression, with no per-expression hand-coded evaluator required.

The dispatcher evaluates composed expression trees in one JNI hop. If a child node (e.g. upper(col) inside rlike(upper(col), pattern)) isn't supported natively, the whole tree still evaluates together without forcing whole-plan fallback.

Benchmarks competitively with the hand-coded RegExpLikeUDF from #4239. Spark's MutableProjection codegen produces the same hot-loop shape (bytes to UTF8String to eval to result) that a hand-written loop does, so there is no inherent per-row dispatcher overhead.

One Janino compile per expression tree, cached by registry key.

Limitations

Near-term, fixable with incremental work

  • Types are prototype-narrow. Input is VarCharVector only, output is BitVector only. Widening is mechanical: build an Array[ColumnReader] and a ResultWriter at cache-miss time, dispatching on Arrow type and expression.dataType once per expression. Scaladoc in the prototype file sketches the shape.
  • CometLambdaRegistry is JVM-local. Driver and executor sharing a JVM works for local Spark only. Cluster mode requires serializing the bound Expression into the proto (Java serialization or Kryo) and dropping the UUID key.
  • Only CometRLike is wired for the generic path in this prototype. The serde logic is not RLike-specific and can be extracted to a single helper that any CometExpressionSerde opts into with one line.
  • Nondeterministic expressions (rand, monotonically_increasing_id) need an initialize(partitionIndex) call before the first row. Easy to add at cache-miss time.
  • Registry entries are never removed, which leaks for long-running drivers.
  • VarCharVector.get(i) copies bytes into a fresh byte[] per row. Matches RegExpLikeUDF, so the A/B comparison is fair, but both paths would improve with a reusable NullableVarCharHolder or UTF8String.fromAddress.

Longer-term, may never fully reach

  • Aggregates, window functions, and generators do not fit the CometUdfBridge "one result vector per input, same length" contract. Each needs its own bridge signature.
  • Python and Pandas UDFs are reachable in principle (they are Expression subclasses). Whether the per-row socket IPC to the Python worker is cheaper than whole-plan fallback would need to be measured.
  • Performance parity with native Rust on expressions that emit per-row allocations (decimals, arrays, strings out) is unlikely. JVM boxing through UnsafeRow and ArrayData is inherent to the evaluation shape, whereas native Rust writes directly into Arrow buffers.
  • Cross-Spark-version stability of Expression serialization is fragile. Spark internals change between releases, and a cluster-mode implementation would need a compatibility story.

Benchmark numbers

Per-row nanoseconds, lower is better. Apple M3 Max, OpenJDK 11.0.30+7-LTS, macOS 26.4.1. Source: CometRegExpBenchmark with one extra case added for the generic dispatcher.

Pattern Spark Comet (Scan) Comet (Exec, native Rust) Comet (Exec, JVM hand-coded) Comet (Exec, JVM generic)
character_class [0-9]+ 12561.0 10616.9 4764.3 4377.9 4293.4
anchored ^[0-9] 9077.1 8776.9 3463.7 3487.0 3384.8
alternation abc|def|ghi 12189.4 11970.7 6837.2 6497.1 6785.3
multi_class [a-zA-Z][0-9]+ 9394.9 10048.6 4272.1 4193.9 4343.2
repetition (ab){2,} 9160.1 9146.7 4086.7 4075.5 4125.5

The generic path tracks the hand-coded path within a few percent across all five patterns. Native Rust is competitive but not dominant on these patterns, likely because the workload favors JIT-warmed backtracking over DFA construction. On adversarial patterns or non-regex expressions with tight Rust kernels, native would be expected to pull further ahead.

I think this is a super promising direction to more quickly (and provide 100% compatibility) support UDFs! Thanks @andygrove!

@mbutrovich
Copy link
Copy Markdown
Contributor

Follow-up on the generic dispatch comment above. The generic dispatcher still pays per-row Scala bookkeeping. This variant compiles the whole per-batch loop, Arrow reads and writes included, into one Janino-produced method. Matches the hand-coded RegExpLikeUDF across all patterns, closes the multi_class gap the MutableProjection dispatcher showed, and keeps one dispatcher class for any scalar Spark Expression. Branch: https://github.com/mbutrovich/datafusion-comet/tree/jvm-udf-generic-dispatcher. Core files: CometCodegenDispatchUDF.scala, CometBatchKernelCodegen.scala, ArrowBackedRow.scala.

What it does

Plan-time wiring is identical to the MutableProjection dispatcher: bind the Spark Expression, register it in CometLambdaRegistry, pass the registry key as arg 0 of the JvmScalarUdf proto, pass data columns as args 1..N.

At execute time, on cache miss per registry key, CometBatchKernelCodegen emits a specialized CometBatchKernel subclass whose process method looks like:

void process(VarCharVector[] inputs, BitVector output, int numRows) {
  ArrowBackedRow row = new ArrowBackedRow(inputs);
  for (int i = 0; i < numRows; i++) {
    row.setRowIdx(i);
    <inlined expr.genCode(ctx) output>
    if (<result.isNull>) output.setNull(i);
    else output.set(i, <result.value> ? 1 : 0);
  }
}

ArrowBackedRow extends InternalRow through a CometInternalRow shim. BoundReference.genCode emits row.getUTF8String(ord), which reads the Arrow vector at the current row index directly. No per-row SpecificInternalRow.update, no MutableProjection.apply virtual call, no intermediate InternalRow between expression evaluation and the Arrow write. One Janino compile per expression tree, cached by registry key.

Three approaches side by side

Aspect Hand-coded CometUDF (this PR) Generic MutableProjection Arrow-direct codegen
Classes per expression One Zero Zero
Per-row batch loop Hand-written Scala Interpreted Scala Compiled Java
Arrow read and write Hand-written Interpreted Scala Compiled Java
Expression evaluation Hand-written Compiled (projection codegen) Compiled (doGenCode), inlined into the fused loop
Composed expression trees No, without native support for children Yes Yes
Adding a new scalar expression New UDF class + serde branch Free in the supported type set Free in the supported type set
Current input types Per UDF StringType StringType
Current output types Per UDF BooleanType BooleanType
Widening inputs Per UDF Add ColumnReader cases Add getters to ArrowBackedRow
Widening outputs Per UDF Dispatch in evaluate Dispatch in codegen template
Cluster mode today Works Local only Local only
Aggregates, windows, generators Requires bridge change Same Same

Both dispatcher paths share the same architectural limits (local-mode registry, scalar-shape one-in-one-out, no Nondeterministic.initialize, never-freed registry entries, per-row UTF8String.fromBytes copy). None block measurement. All are addressable incrementally without reshaping either dispatcher.

Benchmark numbers

Per-row nanoseconds, lower is better. Apple M3 Max, OpenJDK 11.0.30+7-LTS, macOS 26.4.1. From CometRegExpBenchmark with a sixth column for the codegen variant.

Pattern Spark Comet (Scan) Comet (Exec, native Rust) Comet (Exec, JVM hand-coded) Comet (Exec, JVM generic) Comet (Exec, JVM codegen)
character_class [0-9]+ 11468.3 10554.4 4903.0 4181.6 4365.6 4108.0
anchored ^[0-9] 8562.0 8548.7 3306.3 3263.2 3355.7 3137.7
alternation abc|def|ghi 11060.1 11167.9 5778.0 5754.4 5847.5 5769.7
multi_class [a-zA-Z][0-9]+ 9255.0 9469.0 4179.3 4401.4 5323.0 4152.2
repetition (ab){2,} 8930.1 8846.0 3932.4 3860.3 3937.4 3879.9

Reading the three JVM columns:

  • vs. hand-coded RegExpLikeUDF: codegen is within noise on all five patterns. No regression.
  • vs. MutableProjection generic dispatch: codegen matches on four patterns and is faster on multi_class (4152 vs 5323 per row), where the Scala-side per-row update loop trailed the hand-coded path. Fusing the Arrow read and write into the compiled loop closes that gap.

Regex matching dominates at this cardinality, so per-row dispatcher bookkeeping barely moves the needle when matches are expensive. On patterns where per-row work is larger relative to regex work (short subjects, cheap patterns, composed expression trees with several children), the codegen variant's lead over the MutableProjection dispatcher should widen. This benchmark does not stress that regime.

Limitations

Carried over from the MutableProjection variant:

  • CometLambdaRegistry is JVM-local. Cluster mode needs serialized-expression bytes in the proto.
  • Scalar shape only. Aggregates, windows, generators need a different bridge.
  • Nondeterministic.initialize(partitionIndex) is not wired.
  • Registry entries are never removed.
  • Per-row UTF8String.fromBytes(vec.get(i)) copies into a fresh byte[]. Matches hand-coded, so the A/B is fair. Both paths improve with UTF8String.fromAddress.

Specific to the codegen variant:

  • ArrowBackedRow implements isNullAt and getUTF8String only. Other getters inherit the CometInternalRow shim's throwing defaults until widened.
  • The codegen template's output write is hardcoded to BitVector and boolean. New output types are additive cases.
  • Dictionary-encoded Arrow inputs are not handled.

Widening is local: one getter in ArrowBackedRow per new input Arrow type, one case in the codegen template per new output Spark type. Expression evaluation comes from Spark's doGenCode, so composition is free.

On #4239

The follow-up PR #4239 adds hand-coded UDFs covering the remaining regex expressions on this framework. LLM-assisted authoring makes that work cheap at write time, but every class is code to review, test against Spark semantics across versions, and maintain as Spark evolves. The generic dispatcher covers the same surface with zero per-expression code. Worth weighing the two directions before merging, since committing to the hand-coded catalogue makes a later switch to the dispatcher path harder to justify.

Bottom line

Matches hand-coded performance, closes the MutableProjection dispatcher's multi_class gap, costs zero Scala per new scalar expression. The Arrow-direct codegen dispatcher is the shape to reach for before writing a new CometUDF, with hand-coded kept only where the dispatcher's remaining gaps matter. Thanks again @andygrove, this is a very exciting solution!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants