Describe the bug
When CometExecRule converts a BroadcastExchangeExec to CometBroadcastExchangeExec, any ReusedExchangeExec nodes that referenced the original BroadcastExchangeExec are not updated. This breaks exchange reuse, resulting in duplicate broadcast execution and an extra project node in the plan.
This is caused by rule ordering: ReuseExchangeAndSubquery runs as a physical preparation rule and sets up ReusedExchangeExec → BroadcastExchangeExec. Then CometExecRule runs as a columnar rule (after preparation) and converts the join's BroadcastExchangeExec to CometBroadcastExchangeExec. The SubqueryBroadcastExec inside the DPP filter still holds a reference to the original BroadcastExchangeExec. The two exchanges have different canonicalized types and hashes, so they can't be matched for reuse.
Returning the wrapped exchange's canonicalized form from CometBroadcastExchangeExec.doCanonicalize() would be incorrect because the two exchanges produce different output formats (Arrow columnar vs Spark HashedRelation). They are not interchangeable at runtime.
This was exposed by adding non-AQE DPP support to CometNativeScanExec. Previously the DPP scan fell back to FileSourceScanExec, so the join wasn't fully native and CometExecRule didn't convert the broadcast exchange. With DPP working natively, the full join tree converts to Comet operators and the reuse breaks.
Steps to reproduce
test("DPP join exchange reuse") {
withTempDir { dir =>
val path = s"${dir.getAbsolutePath}/data"
spark.range(100).selectExpr(
"id % 10 as key", "cast(id * 2 as int) as a",
"cast(id * 3 as int) as b", "cast(id as string) as c",
"array(id, id + 1, id + 3) as d")
.write.partitionBy("key").parquet(path)
spark.read.parquet(path).createOrReplaceTempView("testView")
withSQLConf(
SQLConf.USE_V1_SOURCE_LIST.key -> "parquet",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
val query =
"""select * from (select key, a, c, b from testView) as t1
|join (select key, a, b, c from testView) as t2
|on t1.key = t2.key where t2.a > 50""".stripMargin
val df = sql(query)
df.collect()
val plan = df.queryExecution.executedPlan
val reused = collectWithSubqueries(plan) {
case e: ReusedExchangeExec => e
}
assert(reused.nonEmpty, "Expected exchange reuse")
}
}
}
Expected behavior
The plan should contain a ReusedExchangeExec on the broadcast side of the join, reusing the broadcast from the DPP SubqueryBroadcastExec:
CometBroadcastHashJoin
:- CometProject [key, a, c, b]
: +- CometNativeScan [a, b, c, key] (DPP scan)
: +- SubqueryBroadcast
: +- BroadcastExchange (original)
+- ReusedExchange (reuses BroadcastExchange)
Additional context
Actual plan (no reuse, extra project):
CometBroadcastHashJoin
:- CometProject [key, a, c, b]
: +- CometNativeScan [a, b, c, key] (DPP scan)
: +- SubqueryBroadcast
: +- BroadcastExchange (Spark exchange, in subquery)
+- CometBroadcastExchange (Comet exchange, NOT reused)
+- CometProject [key, a, b, c] (extra project)
+- CometFilter
+- CometNativeScan
Canonicalization output confirms the mismatch:
CometBroadcastExchangeExec canonicalized: CometBroadcastExchangeExec, hash=-356598217
BroadcastExchangeExec canonicalized: BroadcastExchangeExec, hash=-555436138
On main (where DPP falls back to Spark), the join stays as BroadcastHashJoinExec with ReusedExchangeExec and only 2 project nodes.
This surfaces as a failure in Spark's RemoveRedundantProjectsSuite "join with ordering requirement" test, which expects 2 project nodes but finds 3.
Describe the bug
When
CometExecRuleconverts aBroadcastExchangeExectoCometBroadcastExchangeExec, anyReusedExchangeExecnodes that referenced the originalBroadcastExchangeExecare not updated. This breaks exchange reuse, resulting in duplicate broadcast execution and an extra project node in the plan.This is caused by rule ordering:
ReuseExchangeAndSubqueryruns as a physical preparation rule and sets upReusedExchangeExec→BroadcastExchangeExec. ThenCometExecRuleruns as a columnar rule (after preparation) and converts the join'sBroadcastExchangeExectoCometBroadcastExchangeExec. TheSubqueryBroadcastExecinside the DPP filter still holds a reference to the originalBroadcastExchangeExec. The two exchanges have different canonicalized types and hashes, so they can't be matched for reuse.Returning the wrapped exchange's canonicalized form from
CometBroadcastExchangeExec.doCanonicalize()would be incorrect because the two exchanges produce different output formats (Arrow columnar vs SparkHashedRelation). They are not interchangeable at runtime.This was exposed by adding non-AQE DPP support to
CometNativeScanExec. Previously the DPP scan fell back toFileSourceScanExec, so the join wasn't fully native andCometExecRuledidn't convert the broadcast exchange. With DPP working natively, the full join tree converts to Comet operators and the reuse breaks.Steps to reproduce
Expected behavior
The plan should contain a
ReusedExchangeExecon the broadcast side of the join, reusing the broadcast from the DPPSubqueryBroadcastExec:Additional context
Actual plan (no reuse, extra project):
Canonicalization output confirms the mismatch:
On main (where DPP falls back to Spark), the join stays as
BroadcastHashJoinExecwithReusedExchangeExecand only 2 project nodes.This surfaces as a failure in Spark's
RemoveRedundantProjectsSuite"join with ordering requirement" test, which expects 2 project nodes but finds 3.