Skip to content

Introduced explicit index resolution API#1

Open
nibix wants to merge 1 commit into
mainfrom
explicit-index-api
Open

Introduced explicit index resolution API#1
nibix wants to merge 1 commit into
mainfrom
explicit-index-api

Conversation

@nibix

@nibix nibix commented Jun 10, 2025

Copy link
Copy Markdown
Owner

Description

This introduces a new method that allows TransportAction implementations to report the indices the will actually operate on via a standard API to any ActionFilter implementation. This can be then used for example by access control filters to check authorization.

This has several advantages:

  • Actually, the way transport actions interpret indices specified in IndicesRequests is extremely diverse. Some do resolution of all expressions and rely on index options, some do resolution of all expressions and modify index options, some do only date math expression resolution, and some do no resolution at all. This change makes gives a method to make it very explicit which indices will be touched by an action.
  • Actually, the case where no resolution happens at all is quite common - that's the case for most shard-based actions. This allows us to avoid a costly resolution for all these cases. This should give a significant performance boost.

Note: This is not complete yet and is intended only as a sneak-peek.

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

nibix pushed a commit that referenced this pull request May 22, 2026
…earch-project#21513)

* [Analytics Engine] Port json_array_length to DataFusion backend

First PPL json_* function wired through PPL → Calcite → Substrait →
DataFusion. Scaffolds the pattern every follow-up UDF reuses: Rust kernel
+ YAML signature + ScalarFunction enum entry + JsonFunctionAdapters
rename + FunctionMappings.s(...) binding + STANDARD_PROJECT_OPS entry.

Rust UDF (rust/src/udf/json_array_length.rs) coerces the input to Utf8,
parses with serde_json, and returns Int32 to match PPL's
INTEGER_FORCE_NULLABLE declaration — returning Int64 would leak through
column-valued calls even though literal args const-fold via a narrowing
CAST. Malformed / non-array / NULL input → NULL, matching legacy
JsonArrayLengthFunctionImpl's NullPolicy.ANY + Gson parity.

ScalarFunction.CAST added to STANDARD_PROJECT_OPS so PPL's implicit CAST
around a UDF call (inserted when the UDF's declared return type differs
from the eval column's inferred type) doesn't fail OpenSearchProjectRule
with "No backend supports scalar function [CAST]". DataFusion handles
CAST natively — no UDF needed.

STANDARD_PROJECT_OPS and scalarFunctionAdapters reshaped to one-entry-
per-line (Map.ofEntries / Set.of) so parallel json_* PRs append without
touching neighbour lines.

Tests:
  * 10 Rust unit tests (flat/nested arrays, non-array, malformed, NULL,
    coerce_types accept/reject, arity guard, scalar-input fast path).
  * JsonFunctionAdaptersTests guards adapter shape + return-type
    preservation (BIGINT vs LOCAL_OP's INTEGER_NULLABLE).
  * ScalarJsonFunctionIT covers happy path, empty array, non-array
    object → NULL, malformed → NULL via /_analytics/ppl.

Parity-checked against legacy SQL plugin
CalcitePPLJsonBuiltinFunctionIT.testJsonArrayLength.

Signed-off-by: Eric Wei <mengwei.eric@gmail.com>

* [Analytics Engine] JSON: introduce jsonpath-rust parser + shared helpers

Lands the parser crate + a small shared helpers module ahead of the per-
function json_* UDFs. Keeping this on its own commit lets reviewers sign
off on the crate choice (jsonpath-rust 0.7) and path-conversion behaviour
before 8 UDF bodies land on top.

  * rust/Cargo.toml: add jsonpath-rust = "0.7".
  * rust/src/udf/json_common.rs:
      - convert_ppl_path: PPL path syntax (`a{i}.b{}`) -> JSONPath (`$.a[i].b[*]`).
        Mirrors JsonUtils.convertToJsonPath in sql/core. Empty string maps
        to "$" to match legacy root semantics.
      - parse: serde_json wrapper returning None on malformed input, the
        contract every json_* UDF will share.
      - check_arity / check_arity_range: plan_err! wrappers for the
        top-of-invoke guards.
  * rust/src/udf/mod.rs: register the module (helpers are crate-private).

Consumers land in follow-up commits on the same PR (opensearch-project#21513); a module-
level #![allow(dead_code)] keeps this commit's cargo check clean.

Signed-off-by: Eric Wei <mengwei.eric@gmail.com>

* [Analytics Engine] Port json_keys to DataFusion backend

Adds the second PPL json_* UDF on top of opensearch-project#21476 (json_array_length).
Matches the legacy SQL-plugin contract: object → JSON-array-encoded keys
in insertion order; non-object / malformed / scalar → SQL NULL.

- Rust UDF at rust/src/udf/json_keys.rs with scalar + columnar paths
- Shared rust/src/udf/json_common.rs helpers (parse, arity, Utf8 downcast,
  PPL-path → JSONPath) seeded for later json_* UDFs
- serde_json preserve_order feature to preserve legacy LinkedHashMap ordering
- Java wiring: ScalarFunction.JSON_KEYS, JsonKeysAdapter, Substrait sig,
  YAML signature, plugin project-op + adapter registration
- ScalarJsonFunctionIT parity test for the four legacy fixtures

Signed-off-by: Eric Wei <mengwei.eric@gmail.com>

* [Analytics Engine] Port json_extract to DataFusion backend

Rust UDF at rust/src/udf/json_extract.rs wraps jsonpath-rust: single path →
unquoted scalar or JSON-serialized container; multi-path → JSON array with
literal null slots for misses. < 2 args, malformed doc, malformed path, and
explicit-null matches all collapse to SQL NULL, matching legacy
JsonExtractFunctionImpl's calcite jsonQuery/jsonValue pair.

JsonExtractAdapter renames the PPL call to the Rust UDF name via the variadic
path; routing lives in FunctionMappings.s(...) in DataFusionFragmentConvertor
and the STANDARD_PROJECT_OPS allow-list.

Also fixes a pre-existing transport bug in DatafusionResultStream.getFieldValue:
VarCharVector.getObject returns Arrow Text, which StreamOutput.writeGenericValue
cannot serialize, so string-valued UDF results (json_keys, json_extract) were
dropped when shard results traveled back to the coordinator. Converting
VarCharVector cells to String at the source mirrors ArrowValues.toJavaValue
and unblocks every string-returning UDF.

Parity IT (ScalarJsonFunctionIT) replays four verbatim legacy cases covering
single-path scalar/container match, wildcard multi-match, multi-path with
missing path, and explicit-null resolution.

Signed-off-by: Eric Wei <mengwei.eric@gmail.com>

* [Analytics Engine] Port json_delete to DataFusion backend

Mutation UDF #1. Introduces the shared mutation walker that json_set,
json_append, and json_extend will reuse on the same PR.

Rust side (rust/src/udf/json_delete.rs + json_common.rs):
  * `parse_ppl_segments` tokenises PPL paths (a.b{0}.c{}) into Field /
    Index / Wildcard segments without allocating field names.
  * `walk_mut` drives a mutation closure against every terminal match in
    a serde_json::Value; missing intermediate keys and out-of-range
    indices are silent no-ops, matching Jayway's SUPPRESS_EXCEPTIONS
    behaviour that legacy `JsonDeleteFunctionImpl` (→ Calcite
    `JsonFunctions.jsonRemove`) relies on.
  * `json_delete` terminal closure: `shift_remove` on Object (preserves
    insertion order via serde_json's `preserve_order` feature),
    `Vec::remove` on Array-with-Index, `Vec::clear` on Array-with-Wildcard.
    Any-NULL-arg / malformed doc / malformed path → NULL.

The walker is generic enough that json_set / json_append / json_extend
are now pure terminal-closure swaps (set value, push value, extend
array) — no further traversal plumbing needed.

Java side:
  * JSON_DELETE added to `ScalarFunction`, `STANDARD_PROJECT_OPS`, and
    `scalarFunctionAdapters`.
  * `JsonDeleteAdapter` is a plain `AbstractNameMappingAdapter` rename
    (matches the other json_* adapters).
  * Substrait YAML signature uses `variadic: {min: 1}` — same shape as
    json_extract.

Tests:
  * 10 Rust unit tests for json_delete (4 legacy IT fixtures replayed:
    flat-key, nested, missing-path-unchanged, wildcard-array; plus
    any-NULL / malformed / coerce_types / return_type).
  * 4 new walker tests in json_common (tokeniser, flat-delete,
    missing-noop, wildcard-fan-out, index-out-of-range-noop).
  * ScalarJsonFunctionIT gains `testJsonDeleteParityWithLegacy`
    replaying all 4 legacy assertions.

Parity-checked against legacy SQL plugin
`CalcitePPLJsonBuiltinFunctionIT.testJsonDelete*`.

Signed-off-by: Eric Wei <mengwei.eric@gmail.com>

* [Analytics Engine] Port json_set to DataFusion backend

Mutation UDF #2. Reuses the walker introduced by #json_delete; this
commit is a pure terminal-closure swap on the Rust side (replace, not
remove) plus the usual 7-file Java/YAML wiring.

Rust side (rust/src/udf/json_set.rs):
  * Terminal closure overwrites only existing keys on Object
    (`map.contains_key` guard), in-range slots on Array-with-Index, and
    every element on Array-with-Wildcard. This is the replace-only
    semantics from legacy `JsonSetFunctionImpl` (→ Calcite
    `JsonFunctions.jsonSet`, which guards `ctx.set` with
    `ctx.read(k) != null`).
  * Variadic arity: (doc, path1, val1, [path2, val2, ...]). Fewer than
    3 args or an odd total (unpaired trailing path) short-circuits to
    NULL, mirroring the "malformed input → NULL" convention the other
    json_* UDFs follow.
  * Values are always stored as `Value::String` because every arg is
    coerced to Utf8 by `coerce_types` — matches the legacy fixture's
    `"b":"3"` (stringified, not numeric).
  * Root-path (`parse_ppl_segments` returns empty) is a no-op to match
    Jayway's behaviour: `ctx.set("$", v)` silently fails because the
    root is indelible and unreplaceable.

Java side:
  * JSON_SET added to `ScalarFunction`, `STANDARD_PROJECT_OPS`, and
    `scalarFunctionAdapters`.
  * `JsonSetAdapter` is a plain `AbstractNameMappingAdapter` rename.
  * Substrait YAML signature uses `variadic: {min: 1}` — same shape as
    json_extract / json_delete.

Tests:
  * 9 Rust unit tests for json_set (3 legacy IT fixtures replayed:
    wildcard-replace, wrong-path-unchanged, partial-wildcard-set; plus
    multi-pair / any-NULL / malformed-doc / malformed-path /
    coerce_types / return_type).
  * ScalarJsonFunctionIT gains `testJsonSetParityWithLegacy` replaying
    all 3 legacy assertions.

Parity-checked against legacy SQL plugin
`CalcitePPLJsonBuiltinFunctionIT.testJsonSet*`.

Signed-off-by: Eric Wei <mengwei.eric@gmail.com>

* [Analytics Engine] Port json_append to DataFusion backend

Mutation UDF opensearch-project#3. Another walker reuse: terminal closure pushes the
paired value onto array-valued targets (non-array / missing targets
are silent no-ops).

Rust side (rust/src/udf/json_append.rs):
  * Terminal closure branches: Object+Field → look up field, if it's an
    Array push the stringified value; Array+Index → if the indexed slot
    is an Array, push; Array+Wildcard → push onto every array-valued
    child. Non-array matches are skipped, matching legacy
    `JsonFunctions.jsonInsert` via Jayway's Collection-parent branch
    (`Collection.add`) which is how `JsonAppendFunctionImpl`'s
    `.meaningless_key` suffix trick ultimately expands.
  * Variadic arity (doc, path1, val1, [path2, val2, ...]). Fewer than 3
    args or an odd total (unpaired trailing path) → NULL — the
    malformed-input-to-NULL convention all other json_* UDFs share.
    Matches legacy's `RuntimeException("needs corresponding path and
    values")` observably-as-error via NULL surface.
  * Pre-stringified values: all args are Utf8-coerced at `coerce_types`
    entry, so nested `json_object(...)` / `json_array(...)` arrive here
    already stringified. They are pushed as `Value::String`, which
    reproduces the legacy IT's quoted-JSON-as-element rows without the
    new engine having to implement `json_object`/`json_array` yet
    (they ship in a follow-up PR).

Java side:
  * JSON_APPEND added to `ScalarFunction`, `STANDARD_PROJECT_OPS`, and
    `scalarFunctionAdapters`.
  * `JsonAppendAdapter` is a plain `AbstractNameMappingAdapter` rename.
  * Substrait YAML signature uses `variadic: {min: 1}` — same shape as
    json_extract / json_delete / json_set.

Tests:
  * 12 Rust unit tests for json_append (3 legacy IT fixtures replayed
    with pre-stringified nested JSON: named-array push, nested-path
    push, stringified-object push; plus multi-pair / wildcard-fan-out /
    non-array-noop / missing-path-noop / any-NULL / malformed-doc /
    malformed-path / coerce_types / return_type).
  * ScalarJsonFunctionIT gains `testJsonAppendParityWithLegacy`
    replaying all 3 legacy assertions with literal stringified JSON in
    place of the nested constructor calls the legacy test uses.

Parity-checked against legacy SQL plugin
`CalcitePPLJsonBuiltinFunctionIT.testJsonAppend`.

Signed-off-by: Eric Wei <mengwei.eric@gmail.com>

* [Analytics Engine] Port json_extend to DataFusion backend

Mutation UDF opensearch-project#4 — last walker reuse. Same push shape as json_append,
but each paired value is first tried as a JSON-array parse: success →
spread the elements; failure → push the whole string as one element
(parity with legacy `JsonExtendFunctionImpl`'s `gson.fromJson(v,
List.class)` try/fall-back).

Rust side (rust/src/udf/json_extend.rs):
  * Helper `spread(raw) -> Vec<Value>`: returns the parsed items when
    `raw` is a JSON array, else `[Value::String(raw)]`. Scalars,
    objects, and malformed JSON all go through the single-push branch.
  * Terminal closure reuses json_append's array-target guards (Object
    field → Array, Array+Index → inner Array, Array+Wildcard → every
    array child). `Vec::extend(items.iter().cloned())` handles the
    spread and the single-push case uniformly.
  * Variadic arity matches every other mutation UDF. Invalid arity /
    any-NULL / malformed-doc / malformed-path → NULL.

Deliberate divergence from legacy: integer-typed spread elements stay
integers (serde_json preserves source type) rather than being widened
to Double as Gson does. Documented in `json.md:555` but not covered by
any legacy IT; we preserve the more useful default and will file a
tracking issue for the wider Gson-compat decision.

Java side:
  * JSON_EXTEND added to `ScalarFunction`, `STANDARD_PROJECT_OPS`, and
    `scalarFunctionAdapters`.
  * `JsonExtendAdapter` is a plain `AbstractNameMappingAdapter` rename.
  * Substrait YAML signature uses `variadic: {min: 1}` — same shape as
    the other variadic json_* UDFs.

Tests:
  * 13 Rust unit tests for json_extend (3 legacy IT fixtures replayed:
    single-push on non-array value, plain-string push, JSON-array
    spread; plus empty-array-value / mixed-type-spread / wildcard-fan
    / non-array-noop / missing-path-noop / any-NULL / malformed-doc /
    malformed-path / coerce_types / return_type).
  * ScalarJsonFunctionIT gains `testJsonExtendParityWithLegacy`
    replaying all 3 legacy assertions with literal stringified JSON
    standing in for the nested constructor calls the legacy test uses.

Parity-checked against legacy SQL plugin
`CalcitePPLJsonBuiltinFunctionIT.testJsonExtend`.

Signed-off-by: Eric Wei <mengwei.eric@gmail.com>

---------

Signed-off-by: Eric Wei <mengwei.eric@gmail.com>
nibix pushed a commit that referenced this pull request Jun 4, 2026
…n adapter dispatch (opensearch-project#21911)

* [analytics-engine] Wire FINAL aggregate filter drop and join-condition adapter dispatch

Two small fixes that together unblock CalciteTransposeCommandIT on the
analytics-engine route end-to-end. Both surface only when a single PPL
command produces (a) a non-prefix groupSet, (b) a FILTER aggCall, and
(c) a Join whose condition carries PPL UDFs — which is how PPL transpose
lowers via RelBuilder.unpivot()/pivot(). Today transpose is the only
PPL command that hits this combination, but the bugs are general.

1. DistributedAggregateRewriter.buildOne — drop filterArg on FINAL
-----------------------------------------------------------------
   FINAL's input is PARTIAL output, laid out as [group keys, agg states].
   The boolean column referenced by an aggCall's FILTER predicate exists
   only on the ORIGINAL child input that PARTIAL consumed; PARTIAL has
   already applied the filter while accumulating state. The Aggregate
   constructor's `isPredicate(input, filterArg)` check then fires when it
   reads filterArg=N against PARTIAL output that has fewer than N+1
   columns (or whose Nth column is non-boolean).

   Set filterArg = -1 (Calcite's "no FILTER" sentinel — there's no
   create() overload that omits it) on the FINAL call. Semantically
   correct: FILTER is a row-level gate consumed once during accumulation;
   merging states never re-applies it. This generalises to multi-stage
   chains (PARTIAL → PARTIAL2 → FINAL): only the first stage that
   consumes raw rows keeps filterArg.

   Without this fix, transpose IT fails with
   `IllegalArgumentException: filter must be BOOLEAN NOT NULL` from
   Aggregate.<init>:178.

2. BackendPlanAdapter — dispatch OpenSearchJoin for adapter rewrite
------------------------------------------------------------------
   adaptNode() walks Filter / Project / Aggregate(FINAL) and runs each
   RexNode through the backend's ScalarFunctionAdapter chain
   (e.g. ToStringFunctionAdapter rewrites NUMBER_TO_STRING to a plain
   CAST that isthmus understands). Calcite's FILTER_INTO_JOIN rule
   inlines an outer Filter's predicate into an inner Join's condition,
   so any PPL UDF that lived in the Project below the Filter rides
   into the Join condition. With Join missing from adaptNode's dispatch
   list, that copy of the UDF reaches isthmus unrewritten and trips
   "Unable to convert call NUMBER_TO_STRING(fp64?)" in
   RexExpressionConverter.

   Add an OpenSearchJoin branch that runs the join's condition through
   adaptRex with concatenated left+right field storage — same convention
   OpenSearchJoin#getOutputFieldStorage() uses on the output side.

Verified
--------
CalciteTransposeCommandIT (with `-Dtests.analytics.parquet_indices=true`)
on 21804-merged main:

* Without these fixes:        0/5 pass (5/5 hit "filter must be BOOLEAN NOT NULL")
* With #1 only:               4/5 pass (testTransposeWithValueFieldNameCollision
                                        hits "Unable to convert call NUMBER_TO_STRING(fp64?)")
* With #1 + #2:               5/5 pass

Signed-off-by: Songkan Tang <songkant@amazon.com>

* Read join field storage from the join node, not re-derived from children

Address review feedback (expani): use OpenSearchJoin#getOutputFieldStorage()
directly in adaptJoin instead of re-assembling it by unwrapping the children —
same result (the node derives it from left ++ right child storage, which traces
back to the FieldStorageInfo marked on the leaf OpenSearchTableScan), and
consistent with how adaptFilter/adaptProject read storage off their node.

Also drop the fieldStorage.isEmpty() short-circuit: adaptRex never indexes the
storage list (it only hands it to scalar adapters, which no-op when a ref has no
storage), so an empty list flows through harmlessly and yields the same result
as the explicit guard — matching adaptFilter, which has no such check.

BackendPlanAdapterTests 8/8 pass (incl. both join-condition tests).

Signed-off-by: Songkan Tang <songkant@amazon.com>

---------

Signed-off-by: Songkan Tang <songkant@amazon.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant