Skip to content

Exchange reuse broken when CometExecRule converts BroadcastExchangeExec after ReuseExchangeAndSubquery #4014

@mbutrovich

Description

@mbutrovich

Describe the bug

When CometExecRule converts a BroadcastExchangeExec to CometBroadcastExchangeExec, any ReusedExchangeExec nodes that referenced the original BroadcastExchangeExec are not updated. This breaks exchange reuse, resulting in duplicate broadcast execution and an extra project node in the plan.

This is caused by rule ordering: ReuseExchangeAndSubquery runs as a physical preparation rule and sets up ReusedExchangeExecBroadcastExchangeExec. Then CometExecRule runs as a columnar rule (after preparation) and converts the join's BroadcastExchangeExec to CometBroadcastExchangeExec. The SubqueryBroadcastExec inside the DPP filter still holds a reference to the original BroadcastExchangeExec. The two exchanges have different canonicalized types and hashes, so they can't be matched for reuse.

Returning the wrapped exchange's canonicalized form from CometBroadcastExchangeExec.doCanonicalize() would be incorrect because the two exchanges produce different output formats (Arrow columnar vs Spark HashedRelation). They are not interchangeable at runtime.

This was exposed by adding non-AQE DPP support to CometNativeScanExec. Previously the DPP scan fell back to FileSourceScanExec, so the join wasn't fully native and CometExecRule didn't convert the broadcast exchange. With DPP working natively, the full join tree converts to Comet operators and the reuse breaks.

Steps to reproduce

test("DPP join exchange reuse") {
  withTempDir { dir =>
    val path = s"${dir.getAbsolutePath}/data"
    spark.range(100).selectExpr(
      "id % 10 as key", "cast(id * 2 as int) as a",
      "cast(id * 3 as int) as b", "cast(id as string) as c",
      "array(id, id + 1, id + 3) as d")
      .write.partitionBy("key").parquet(path)
    spark.read.parquet(path).createOrReplaceTempView("testView")

    withSQLConf(
      SQLConf.USE_V1_SOURCE_LIST.key -> "parquet",
      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
      val query =
        """select * from (select key, a, c, b from testView) as t1
          |join (select key, a, b, c from testView) as t2
          |on t1.key = t2.key where t2.a > 50""".stripMargin
      val df = sql(query)
      df.collect()
      val plan = df.queryExecution.executedPlan

      val reused = collectWithSubqueries(plan) {
        case e: ReusedExchangeExec => e
      }
      assert(reused.nonEmpty, "Expected exchange reuse")
    }
  }
}

Expected behavior

The plan should contain a ReusedExchangeExec on the broadcast side of the join, reusing the broadcast from the DPP SubqueryBroadcastExec:

CometBroadcastHashJoin
:- CometProject [key, a, c, b]
:  +- CometNativeScan [a, b, c, key]  (DPP scan)
:     +- SubqueryBroadcast
:        +- BroadcastExchange         (original)
+- ReusedExchange                     (reuses BroadcastExchange)

Additional context

Actual plan (no reuse, extra project):

CometBroadcastHashJoin
:- CometProject [key, a, c, b]
:  +- CometNativeScan [a, b, c, key]  (DPP scan)
:     +- SubqueryBroadcast
:        +- BroadcastExchange          (Spark exchange, in subquery)
+- CometBroadcastExchange              (Comet exchange, NOT reused)
   +- CometProject [key, a, b, c]      (extra project)
      +- CometFilter
         +- CometNativeScan

Canonicalization output confirms the mismatch:

CometBroadcastExchangeExec canonicalized: CometBroadcastExchangeExec, hash=-356598217
BroadcastExchangeExec canonicalized: BroadcastExchangeExec, hash=-555436138

On main (where DPP falls back to Spark), the join stays as BroadcastHashJoinExec with ReusedExchangeExec and only 2 project nodes.

This surfaces as a failure in Spark's RemoveRedundantProjectsSuite "join with ordering requirement" test, which expects 2 project nodes but finds 3.

Metadata

Metadata

Assignees

Labels

bugSomething isn't workingpriority:mediumFunctional bugs, performance regressions, broken features

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions