[AURON #2358] Support native last / last(ignoreNulls) aggregate#2359
Open
zhuxiangyi wants to merge 1 commit into
Open
[AURON #2358] Support native last / last(ignoreNulls) aggregate#2359zhuxiangyi wants to merge 1 commit into
zhuxiangyi wants to merge 1 commit into
Conversation
Implement native last / last(ignoreNulls) aggregates, mirroring the existing first implementation with "later value wins" semantics: - native: add AggLast / AggLastIgnoresNull (agg/last.rs, agg/last_ignores_null.rs); wire through the AggFunction enum, create_agg, the protobuf contract (LAST / LAST_IGNORES_NULL), the protobuf->AggFunction conversion, and the window-agg mapping. - spark-extension: add the Last expression conversion in NativeConverters; declare the Last native aggregate buffer schema in NativeAggBase.computeNativeAggBufferDataTypes ([dataType] for ignoreNulls, [dataType, Boolean] otherwise) so the partial -> shuffle -> final buffer schema matches the native side. Tests: - Rust unit test agg_exec::test::test_agg_last (partial -> final, nulls). - Scala e2e AuronDataFrameAggregateSuite "native last / last(ignoreNulls) aggregate" (spark34 + spark35), covering the partial -> shuffle -> final native path and asserting NativeAggBase offload.
605140f to
bcb152a
Compare
Contributor
There was a problem hiding this comment.
Pull request overview
Adds native (vectorized) execution support for Spark’s last / last(ignoreNulls) aggregates by implementing corresponding native accumulators in datafusion-ext-plans and wiring the end-to-end Spark ↔ protobuf ↔ planner ↔ native execution path, with coverage in both Rust unit tests and Spark (Scala) integration tests.
Changes:
- Implement native
AggLastandAggLastIgnoresNullcolumnar accumulators (including two-phase partial→final merge behavior) and register them in the native agg factory. - Extend the protobuf contract and planner mappings to recognize
LAST/LAST_IGNORES_NULL, including window aggregate mapping. - Add Spark-side aggregate conversion and native buffer schema declarations, plus Spark 3.4/3.5 end-to-end tests validating correctness and offload.
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggBase.scala | Declares native agg buffer schema for Last to match native accumulator layouts. |
| spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala | Converts Spark Last aggregate into native protobuf LAST / LAST_IGNORES_NULL. |
| native-engine/datafusion-ext-plans/src/agg/mod.rs | Registers new last modules and extends AggFunction enum with Last variants. |
| native-engine/datafusion-ext-plans/src/agg/last.rs | Implements native last(col) accumulator with value + “visited” flag semantics. |
| native-engine/datafusion-ext-plans/src/agg/last_ignores_null.rs | Implements native last(col, ignoreNulls=true) accumulator with overwrite-on-non-null semantics. |
| native-engine/datafusion-ext-plans/src/agg/agg.rs | Wires AggFunction::{Last,LastIgnoresNull} into create_agg. |
| native-engine/datafusion-ext-plans/src/agg_exec.rs | Adds Rust unit test validating two-phase aggregation semantics for both last modes. |
| native-engine/auron-planner/src/planner.rs | Adds window-function mapping for protobuf Last variants to native AggFunction. |
| native-engine/auron-planner/src/lib.rs | Adds protobuf→native AggFunction conversions for Last variants. |
| native-engine/auron-planner/proto/auron.proto | Extends AggFunction protobuf enum with LAST / LAST_IGNORES_NULL. |
| auron-spark-tests/spark35/src/test/scala/org/apache/spark/sql/AuronDataFrameAggregateSuite.scala | Adds Spark 3.5 E2E test asserting correctness and native offload for last. |
| auron-spark-tests/spark34/src/test/scala/org/apache/spark/sql/AuronDataFrameAggregateSuite.scala | Adds Spark 3.4 E2E test asserting correctness and native offload for last. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Closes #2358
Rationale for this change
Auron currently accelerates
first/first(ignoreNulls)natively, butlast/last(ignoreNulls)fall back to the generic UDAF path (a JNI call back into the JVM), losing vectorized acceleration. This PR adds nativelastsupport.What changes are included in this PR?
datafusion-ext-plans): addAggLast/AggLastIgnoresNull(agg/last.rs,agg/last_ignores_null.rs), mirroring thefirstcolumnar accumulators with "later value wins" semantics. Wire them through theAggFunctionenum,create_agg, the protobuf contract (LAST/LAST_IGNORES_NULL), theprotobuf::AggFunction -> AggFunctionconversion, and the window-aggregate mapping.Lastexpression conversion inNativeConverters.convertAggregateExpr; declare theLastnative aggregate buffer schema inNativeAggBase.computeNativeAggBufferDataTypes([dataType]forignoreNulls,[dataType, Boolean]otherwise) so the partial -> shuffle -> final buffer schema matches the native side.Are there any user-facing changes?
Yes.
last(col)andlast(col, ignoreNulls = true)are now executed natively (vectorized); previously they fell back to the UDAF path.How was this patch tested?
agg_exec::test::test_agg_last: partial -> final two-phase aggregation over a nullable column, verifyinglast(keeps the last visited row including null) andlast(ignoreNulls)(keeps the last non-null value).AuronDataFrameAggregateSuite("native last / last(ignoreNulls) aggregate", spark34 + spark35): a grouped aggregate exercising the full partial -> shuffle -> final native path, asserting correct values and that the plan offloads toNativeAggBase.