feat: Stage based fallback#4519
Conversation
|
cc: @mbutrovich @andygrove @mbutrovich @parthchandra |
|
Thanks @karuppayya. I've observed this issue as well. I introduced |
|
@karuppayya new test suites have to be manually added to |
9061d20 to
7b69f7c
Compare
|
Thanks @andygrove . I think difference is that This changes tries evaluates the full stage plan and makes a per-stage decision based on the actual transition cost. In future we can extend it to factor in estimated row counts, operator complexity, etc. |
|
Also just curious since i dont have the background, why is the CI trigger manual? |
Comet has a large ASF consumption, and we working to offload longrunning CI flows to the user forks. For now we trying to run less pipelines in parallel |
comphead
left a comment
There was a problem hiding this comment.
Thanks @karuppayya
One thing to mention is: Comet mostly works on planning phase to inject Comet nodes to be executed. AQE, DPP also.
However this approach proposes a totally new vision and make decisions in runtime? Spark stage is the runtime unit and might also depend on data distribution.
This is a lot of gray area here, for example if Spark scans a folder with 100 files which are unevenly distributed, leading spark tasks to be executed with different runtime and workload, possibly leading to overloaded single task to work more than others. Is it considered a heavy weight stage? what is the strategy to fallback? restart the task?
I just want to understand the approach better and see whats pros and cons it could bring up
|
Thanks @comphead.
Yes and no. This is still a planning-phase rule (
Currently the heuristic is structural ie it is based on counts the number of C2R transitions in a stage's plan. It doesn't take into account data volume etc. Incorporating those kinds of statistics could be a natural next step toward a more cost-based decision. |
|
Thanks @karuppayya for the explanation, having conversion counter guardrail on planner makes a lot of sense to me |
| _ >= 2, | ||
| "Must be >= 2. A reverted stage still requires at least one " + | ||
| "R2C at the columnar shuffle boundary.") | ||
| .createWithDefault(2) |
There was a problem hiding this comment.
This is probably too aggressive and is likely to cause CI failure (because the plans will not match).
There was a problem hiding this comment.
I'd like to keep the threshold at 2 eventually since more than 2 transitions means significant overhead. But for now I've bumped it to 5 for CI to pass. Once we've reviewed the logic , we can iterate on the default and fix test failures than came with it?.
| case RowToColumnarExec(child) => child | ||
| } | ||
| val reverted = stripped.transformUp { | ||
| case cometShuffle: CometShuffleExchangeExec => |
There was a problem hiding this comment.
CometBroadcastExchange extends CometPlan (not CometExec) and will not be matched by any case in this pass.
There was a problem hiding this comment.
In AQE this is already handled — the broadcast exchange is wrapped in BroadcastQueryStageExec (which extends QueryStageExec), and countTransitions stops at QueryStageExec.
For non-AQE, I've added BroadcastExchangeLike to the stop condition in countTransitions to treat it as an execution boundary.(as broadcast runs as a separate job independentof the parent stage).
| case cometExec: CometExec => | ||
| if (cometExec.originalPlan.children.size == cometExec.children.size) { | ||
| cometExec.originalPlan.withNewChildren(cometExec.children) | ||
| } else { |
There was a problem hiding this comment.
When does this get hit? We should probably never reach this in which case a warning in the log is called for.
| plan.transformUp { case exchange: ShuffleExchangeLike => | ||
| revertStageIfNeeded(exchange.child, exchange.supportsColumnar) | ||
| .map(reverted => exchange.withNewChildren(Seq(reverted))) | ||
| .getOrElse(exchange) |
There was a problem hiding this comment.
Why is there no case _ => arm like in the AQE case.
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Can we add a test that reverts and then executes the plan? Maybe two variants like - fallback triggered by unsupported expressions that are reverted, and a threshold set to 1 to force revert.
Test should verify results against Spark.
There was a problem hiding this comment.
I disabled the project operator to guarantee transitions Do you think we still need a test with a naturally fallback (unsupported expression may be UDF), or is the current approach
sufficient?
| plan.transformUp { | ||
| case node if !node.isInstanceOf[QueryStageExec] && !node.supportsColumnar => | ||
| val newChildren = node.children.map { child => | ||
| if (child.supportsColumnar) ColumnarToRowExec(child) else child |
There was a problem hiding this comment.
You could also hit a Row child and a Columnar parent (the reverse transition).
There was a problem hiding this comment.
After revertToSpark, all CometExec nodes are replaced with their row-based operator. The only columnar nodes remaining would be QueryStageExec leaves (inputs from prior stages). I think we will have only columnar children feeding row-based parents. May be i am missing the scenario, an example would help.
| if (newChildren != node.children) node.withNewChildren(newChildren) else node | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
You might want to remove CometTelemetryExec as well. It won't do any harm if it is there but it's pointless to collet Comet telemetry for Spark nodes.
karuppayya
left a comment
There was a problem hiding this comment.
Thanks @parthchandra for the rview.
i have also cleaned up tets.
| case RowToColumnarExec(child) => child | ||
| } | ||
| val reverted = stripped.transformUp { | ||
| case cometShuffle: CometShuffleExchangeExec => |
There was a problem hiding this comment.
In AQE this is already handled — the broadcast exchange is wrapped in BroadcastQueryStageExec (which extends QueryStageExec), and countTransitions stops at QueryStageExec.
For non-AQE, I've added BroadcastExchangeLike to the stop condition in countTransitions to treat it as an execution boundary.(as broadcast runs as a separate job independentof the parent stage).
| plan.transformUp { case exchange: ShuffleExchangeLike => | ||
| revertStageIfNeeded(exchange.child, exchange.supportsColumnar) | ||
| .map(reverted => exchange.withNewChildren(Seq(reverted))) | ||
| .getOrElse(exchange) |
| case cometExec: CometExec => | ||
| if (cometExec.originalPlan.children.size == cometExec.children.size) { | ||
| cometExec.originalPlan.withNewChildren(cometExec.children) | ||
| } else { |
| plan.transformUp { | ||
| case node if !node.isInstanceOf[QueryStageExec] && !node.supportsColumnar => | ||
| val newChildren = node.children.map { child => | ||
| if (child.supportsColumnar) ColumnarToRowExec(child) else child |
There was a problem hiding this comment.
After revertToSpark, all CometExec nodes are replaced with their row-based operator. The only columnar nodes remaining would be QueryStageExec leaves (inputs from prior stages). I think we will have only columnar children feeding row-based parents. May be i am missing the scenario, an example would help.
| if (newChildren != node.children) node.withNewChildren(newChildren) else node | ||
| } | ||
| } | ||
| } |
| _ >= 2, | ||
| "Must be >= 2. A reverted stage still requires at least one " + | ||
| "R2C at the columnar shuffle boundary.") | ||
| .createWithDefault(2) |
There was a problem hiding this comment.
I'd like to keep the threshold at 2 eventually since more than 2 transitions means significant overhead. But for now I've bumped it to 5 for CI to pass. Once we've reviewed the logic , we can iterate on the default and fix test failures than came with it?.
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
I disabled the project operator to guarantee transitions Do you think we still need a test with a naturally fallback (unsupported expression may be UDF), or is the current approach
sufficient?
| stagePlan: SparkPlan, | ||
| outputColumnar: Boolean): Option[SparkPlan] = { | ||
| val transitionCount = countTransitions(stagePlan) | ||
| if (transitionCount <= maxTransitions) return None |
There was a problem hiding this comment.
To make a user friendly response it would be better to use withFallbackReason instead of logging. Example CometExecRule.scala
Which issue does this PR close?
Closes #4518 .
Rationale for this change
Introduces a stage-aware fallback that reverts stages to Spark row-based execution when the number of C2R transitions exceeds a threshold.
What changes are included in this PR?
RevertNativeForTransitionHeavyStages—postColumnarTransitionsrule that counts C2R transitions per stage and reverts viaCometExec.originalPlanwhen threshold exceededspark.comet.exec.transitionRevert.enabled(default: true),spark.comet.exec.transitionRevert.maxTransitions(default: 2, min: 2)How are these changes tested?
Unit tests added