feat: support SortAggregateExec [WIP]#4565
Draft
andygrove wants to merge 1 commit into
Draft
Conversation
Wire SortAggregateExec through the existing CometBaseAggregate.doConvert path so that queries planned with useObjectHashAggregate=false (or with TypedImperativeAggregate functions whose buffer formats prevent hash aggregation) can run their Partial->Final pair natively instead of falling back to Spark. CollectSet was the motivating function; the same wiring covers any other natively-implemented TypedImperativeAggregate. No proto changes: DataFusion's AggregateExec::try_new auto-detects InputOrderMode::Sorted from the child's output ordering, so a sorted input naturally produces sorted output. Wrapper reuse: createExec produces a CometHashAggregateExec (matching CometObjectHashAggregateExec). CometExec.outputOrdering already defaults to originalPlan.outputOrdering, so SortAggregateExec.outputOrdering flows through unchanged for downstream operators that elided a sort against it. Cleanups landed in passing: - Lifted baseAggregateSupportLevel and adjustOutputForNativeState onto the CometBaseAggregate trait (was duplicated 3x and 2x). - Collapsed isAggregate / findCometPartialAgg's Spark-side branches / canAggregateBeConverted's shuffle guard to BaseAggregateExec.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
N/A — proactive coverage for an aggregate operator Spark sometimes plans that Comet was previously falling back on.
Rationale for this change
Spark's planner picks
SortAggregateExecwhen neitherHashAggregateExecnorObjectHashAggregateExecfits — typically whenspark.sql.execution.useObjectHashAggregateExec=falseis set, or forTypedImperativeAggregatefunctions whose buffer state can't ride hash aggregation. Today Comet doesn't recognize the operator at all, so any such Partial→Final pair runs on Spark and tends to drag the surrounding shuffle off Comet too. CollectSet underuseObjectHashAggregate=falseis the motivating example.What changes are included in this PR?
SortAggregateExecto a newCometSortAggregateExecserde object that reuses the existingCometBaseAggregate.doConvertpath. Same Comet-shuffle gate asCometObjectHashAggregateExec(their TypedImperativeAggregate buffer formats differ between Spark and Comet, so a Comet-Partial / Spark-Final split would crash).createExecproduces aCometHashAggregateExecwrapper — the same approachCometObjectHashAggregateExecalready uses.CometExec.outputOrderingdefaults tooriginalPlan.outputOrdering, soSortAggregateExec's grouping-key ordering flows through unchanged for any downstream operator that elided a sort against it.AggregateExec::try_newauto-detectsInputOrderMode::Sortedfrom the child's output ordering, so sorted input naturally produces sorted output and the streaming-aggregate optimization kicks in for free.getSupportLevel(was duplicated 3x) andadjustOutputForNativeState(was duplicated verbatim 2x) onto theCometBaseAggregatetrait, and collapsed three Spark-side aggregate enumeration sites (isAggregate,findCometPartialAgg, the shuffle guard incanAggregateBeConverted) to matchBaseAggregateExeconce instead of listing each subclass.What's NOT included
Arbitrary user-defined
TypedImperativeAggregateUDAFs still fall back — theirupdate/merge/serializemethods are JVM code that native execution can't invoke. This PR only enablesSortAggregateExecfor aggregate functions Comet already implements natively (currentlyCollectSetandBloomFilterAggregateamong the TypedImperative set; future natives likeCollectListwill benefit automatically).How are these changes tested?
Added
CometAggregateSuitetest "SortAggregate with collect_set is converted to native": forcesuseObjectHashAggregateExec=false, asserts Spark plansSortAggregateExecfor the query, then runs Comet and asserts the entire plan executes natively and matches Spark. Verified locally with the fullCometAggregateSuite(80 tests pass) andCometExecRuleSuite.